diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-19 15:28:42 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-21 09:18:49 +0200 |
commit | ba7c8b890c45d735216888204ec88882ef58c918 (patch) | |
tree | cb00c9b51e5353ba3726216679c81c0e2fe9ac35 | |
parent | da1c86b80f230c3a2023f97c0048020a12e38de4 (diff) | |
download | sink-ba7c8b890c45d735216888204ec88882ef58c918.tar.gz sink-ba7c8b890c45d735216888204ec88882ef58c918.zip |
Ported the pipeline to the entitystore
26 files changed, 378 insertions, 533 deletions
diff --git a/common/domain/event.cpp b/common/domain/event.cpp index d50652d..41ec625 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp | |||
@@ -47,27 +47,6 @@ void TypeImplementation<Event>::configureIndex(TypeIndex &index) | |||
47 | index.addProperty<QByteArray>(Event::Uid::name); | 47 | index.addProperty<QByteArray>(Event::Uid::name); |
48 | } | 48 | } |
49 | 49 | ||
50 | static TypeIndex &getIndex() | ||
51 | { | ||
52 | QMutexLocker locker(&sMutex); | ||
53 | static TypeIndex *index = 0; | ||
54 | if (!index) { | ||
55 | index = new TypeIndex("event"); | ||
56 | TypeImplementation<Event>::configureIndex(*index); | ||
57 | } | ||
58 | return *index; | ||
59 | } | ||
60 | |||
61 | void TypeImplementation<Event>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | ||
62 | { | ||
63 | return getIndex().add(identifier, bufferAdaptor, transaction); | ||
64 | } | ||
65 | |||
66 | void TypeImplementation<Event>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | ||
67 | { | ||
68 | return getIndex().remove(identifier, bufferAdaptor, transaction); | ||
69 | } | ||
70 | |||
71 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Event>::Buffer> > TypeImplementation<Event>::initializeReadPropertyMapper() | 50 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Event>::Buffer> > TypeImplementation<Event>::initializeReadPropertyMapper() |
72 | { | 51 | { |
73 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); | 52 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); |
diff --git a/common/domain/event.h b/common/domain/event.h index 18e0f20..fbaf4ed 100644 --- a/common/domain/event.h +++ b/common/domain/event.h | |||
@@ -56,8 +56,6 @@ public: | |||
56 | typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; | 56 | typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; |
57 | static void configureIndex(TypeIndex &index); | 57 | static void configureIndex(TypeIndex &index); |
58 | static QSet<QByteArray> indexedProperties(); | 58 | static QSet<QByteArray> indexedProperties(); |
59 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | ||
60 | static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | ||
61 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); | 59 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); |
62 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); | 60 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); |
63 | }; | 61 | }; |
diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp index 94727a3..058035a 100644 --- a/common/domain/folder.cpp +++ b/common/domain/folder.cpp | |||
@@ -50,28 +50,6 @@ void TypeImplementation<Folder>::configureIndex(TypeIndex &index) | |||
50 | index.addProperty<QString>(Folder::Name::name); | 50 | index.addProperty<QString>(Folder::Name::name); |
51 | } | 51 | } |
52 | 52 | ||
53 | static TypeIndex &getIndex() | ||
54 | { | ||
55 | QMutexLocker locker(&sMutex); | ||
56 | static TypeIndex *index = 0; | ||
57 | if (!index) { | ||
58 | index = new TypeIndex("folder"); | ||
59 | TypeImplementation<Folder>::configureIndex(*index); | ||
60 | } | ||
61 | return *index; | ||
62 | } | ||
63 | |||
64 | void TypeImplementation<Folder>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | ||
65 | { | ||
66 | SinkTrace() << "Indexing " << identifier; | ||
67 | getIndex().add(identifier, bufferAdaptor, transaction); | ||
68 | } | ||
69 | |||
70 | void TypeImplementation<Folder>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | ||
71 | { | ||
72 | getIndex().remove(identifier, bufferAdaptor, transaction); | ||
73 | } | ||
74 | |||
75 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Folder>::Buffer> > TypeImplementation<Folder>::initializeReadPropertyMapper() | 53 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Folder>::Buffer> > TypeImplementation<Folder>::initializeReadPropertyMapper() |
76 | { | 54 | { |
77 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); | 55 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); |
diff --git a/common/domain/folder.h b/common/domain/folder.h index ea0d79a..47a544b 100644 --- a/common/domain/folder.h +++ b/common/domain/folder.h | |||
@@ -50,8 +50,6 @@ public: | |||
50 | typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; | 50 | typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; |
51 | static void configureIndex(TypeIndex &index); | 51 | static void configureIndex(TypeIndex &index); |
52 | static QSet<QByteArray> indexedProperties(); | 52 | static QSet<QByteArray> indexedProperties(); |
53 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | ||
54 | static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | ||
55 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); | 53 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); |
56 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); | 54 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); |
57 | }; | 55 | }; |
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index b0a3aae..9d58767 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp | |||
@@ -170,19 +170,6 @@ static void updateThreadingIndex(const QByteArray &identifier, const BufferAdapt | |||
170 | } | 170 | } |
171 | } | 171 | } |
172 | 172 | ||
173 | void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | ||
174 | { | ||
175 | SinkTrace() << "Indexing " << identifier; | ||
176 | getIndex().add(identifier, bufferAdaptor, transaction); | ||
177 | updateThreadingIndex(identifier, bufferAdaptor, transaction); | ||
178 | } | ||
179 | |||
180 | void TypeImplementation<Mail>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | ||
181 | { | ||
182 | getIndex().remove(identifier, bufferAdaptor, transaction); | ||
183 | //TODO cleanup threading index | ||
184 | } | ||
185 | |||
186 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Mail>::Buffer> > TypeImplementation<Mail>::initializeReadPropertyMapper() | 173 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Mail>::Buffer> > TypeImplementation<Mail>::initializeReadPropertyMapper() |
187 | { | 174 | { |
188 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); | 175 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); |
diff --git a/common/domain/mail.h b/common/domain/mail.h index 81a0d1c..c0cfc55 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h | |||
@@ -50,8 +50,6 @@ public: | |||
50 | typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; | 50 | typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; |
51 | static void configureIndex(TypeIndex &index); | 51 | static void configureIndex(TypeIndex &index); |
52 | static QSet<QByteArray> indexedProperties(); | 52 | static QSet<QByteArray> indexedProperties(); |
53 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | ||
54 | static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | ||
55 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); | 53 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); |
56 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); | 54 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); |
57 | }; | 55 | }; |
diff --git a/common/domainadaptor.h b/common/domainadaptor.h index 6a9d755..195d5ec 100644 --- a/common/domainadaptor.h +++ b/common/domainadaptor.h | |||
@@ -87,11 +87,11 @@ static void createBufferPartBuffer(const Sink::ApplicationDomain::ApplicationDom | |||
87 | * A generic adaptor implementation that uses a property mapper to read/write values. | 87 | * A generic adaptor implementation that uses a property mapper to read/write values. |
88 | */ | 88 | */ |
89 | template <class LocalBuffer, class ResourceBuffer> | 89 | template <class LocalBuffer, class ResourceBuffer> |
90 | class GenericBufferAdaptor : public Sink::ApplicationDomain::BufferAdaptor | 90 | class DatastoreBufferAdaptor : public Sink::ApplicationDomain::BufferAdaptor |
91 | { | 91 | { |
92 | SINK_DEBUG_AREA("bufferadaptor") | 92 | SINK_DEBUG_AREA("bufferadaptor") |
93 | public: | 93 | public: |
94 | GenericBufferAdaptor() : BufferAdaptor() | 94 | DatastoreBufferAdaptor() : BufferAdaptor() |
95 | { | 95 | { |
96 | } | 96 | } |
97 | 97 | ||
@@ -148,18 +148,14 @@ public: | |||
148 | /** | 148 | /** |
149 | * Creates an adaptor for the given domain and resource types. | 149 | * Creates an adaptor for the given domain and resource types. |
150 | * | 150 | * |
151 | * This returns by default a GenericBufferAdaptor initialized with the corresponding property mappers. | 151 | * This returns by default a DatastoreBufferAdaptor initialized with the corresponding property mappers. |
152 | */ | 152 | */ |
153 | virtual QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> createAdaptor(const Sink::Entity &entity) Q_DECL_OVERRIDE | 153 | virtual QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> createAdaptor(const Sink::Entity &entity) Q_DECL_OVERRIDE |
154 | { | 154 | { |
155 | const auto resourceBuffer = Sink::EntityBuffer::readBuffer<ResourceBuffer>(entity.resource()); | 155 | auto adaptor = QSharedPointer<DatastoreBufferAdaptor<LocalBuffer, ResourceBuffer>>::create(); |
156 | const auto localBuffer = Sink::EntityBuffer::readBuffer<LocalBuffer>(entity.local()); | 156 | adaptor->mLocalBuffer = Sink::EntityBuffer::readBuffer<LocalBuffer>(entity.local()); |
157 | // const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
158 | |||
159 | auto adaptor = QSharedPointer<GenericBufferAdaptor<LocalBuffer, ResourceBuffer>>::create(); | ||
160 | adaptor->mLocalBuffer = localBuffer; | ||
161 | adaptor->mLocalMapper = mLocalMapper; | 157 | adaptor->mLocalMapper = mLocalMapper; |
162 | adaptor->mResourceBuffer = resourceBuffer; | 158 | adaptor->mResourceBuffer = Sink::EntityBuffer::readBuffer<ResourceBuffer>(entity.resource()); |
163 | adaptor->mResourceMapper = mResourceMapper; | 159 | adaptor->mResourceMapper = mResourceMapper; |
164 | return adaptor; | 160 | return adaptor; |
165 | } | 161 | } |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index e0d395a..8f08f3d 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -59,9 +59,9 @@ class CommandProcessor : public QObject | |||
59 | public: | 59 | public: |
60 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) | 60 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) |
61 | { | 61 | { |
62 | mLowerBoundRevision = DataStore::maxRevision(mPipeline->storage().createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { | 62 | mPipeline->startTransaction(); |
63 | SinkWarning() << error.message; | 63 | mLowerBoundRevision = mPipeline->revision(); |
64 | })); | 64 | mPipeline->commit(); |
65 | 65 | ||
66 | for (auto queue : mCommandQueues) { | 66 | for (auto queue : mCommandQueues) { |
67 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); | 67 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); |
diff --git a/common/indexupdater.h b/common/indexupdater.h deleted file mode 100644 index 221a4ed..0000000 --- a/common/indexupdater.h +++ /dev/null | |||
@@ -1,91 +0,0 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | #pragma once | ||
20 | |||
21 | #include <pipeline.h> | ||
22 | #include <index.h> | ||
23 | |||
24 | class IndexUpdater : public Sink::Preprocessor | ||
25 | { | ||
26 | public: | ||
27 | IndexUpdater(const QByteArray &index, const QByteArray &type, const QByteArray &property) : mIndexIdentifier(index), mBufferType(type), mProperty(property) | ||
28 | { | ||
29 | } | ||
30 | |||
31 | void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
32 | { | ||
33 | add(newEntity.getProperty(mProperty), uid, transaction); | ||
34 | } | ||
35 | |||
36 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, | ||
37 | Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
38 | { | ||
39 | remove(oldEntity.getProperty(mProperty), uid, transaction); | ||
40 | add(newEntity.getProperty(mProperty), uid, transaction); | ||
41 | } | ||
42 | |||
43 | void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
44 | { | ||
45 | remove(oldEntity.getProperty(mProperty), uid, transaction); | ||
46 | } | ||
47 | |||
48 | private: | ||
49 | void add(const QVariant &value, const QByteArray &uid, Sink::Storage::DataStore::Transaction &transaction) | ||
50 | { | ||
51 | if (value.isValid()) { | ||
52 | Index(mIndexIdentifier, transaction).add(value.toByteArray(), uid); | ||
53 | } | ||
54 | } | ||
55 | |||
56 | void remove(const QVariant &value, const QByteArray &uid, Sink::Storage::DataStore::Transaction &transaction) | ||
57 | { | ||
58 | if (value.isValid()) { | ||
59 | const auto data = value.toByteArray(); | ||
60 | if (!data.isEmpty()) { | ||
61 | Index(mIndexIdentifier, transaction).remove(data, uid); | ||
62 | } | ||
63 | } | ||
64 | } | ||
65 | |||
66 | QByteArray mIndexIdentifier; | ||
67 | QByteArray mBufferType; | ||
68 | QByteArray mProperty; | ||
69 | }; | ||
70 | |||
71 | template <typename DomainType> | ||
72 | class DefaultIndexUpdater : public Sink::Preprocessor | ||
73 | { | ||
74 | public: | ||
75 | void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
76 | { | ||
77 | Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction); | ||
78 | } | ||
79 | |||
80 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, | ||
81 | Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
82 | { | ||
83 | Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction); | ||
84 | Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction); | ||
85 | } | ||
86 | |||
87 | void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
88 | { | ||
89 | Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction); | ||
90 | } | ||
91 | }; | ||
diff --git a/common/mailpreprocessor.cpp b/common/mailpreprocessor.cpp index b978323..d45afe6 100644 --- a/common/mailpreprocessor.cpp +++ b/common/mailpreprocessor.cpp | |||
@@ -116,7 +116,7 @@ static void updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail, KMime: | |||
116 | } | 116 | } |
117 | } | 117 | } |
118 | 118 | ||
119 | void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) | 119 | void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail) |
120 | { | 120 | { |
121 | MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath())); | 121 | MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath())); |
122 | auto msg = mimeMessageReader.mimeMessage(); | 122 | auto msg = mimeMessageReader.mimeMessage(); |
@@ -125,7 +125,7 @@ void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink: | |||
125 | } | 125 | } |
126 | } | 126 | } |
127 | 127 | ||
128 | void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::DataStore::Transaction &transaction) | 128 | void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) |
129 | { | 129 | { |
130 | MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath())); | 130 | MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath())); |
131 | auto msg = mimeMessageReader.mimeMessage(); | 131 | auto msg = mimeMessageReader.mimeMessage(); |
@@ -161,21 +161,21 @@ QString MimeMessageMover::moveMessage(const QString &oldPath, const Sink::Applic | |||
161 | return oldPath; | 161 | return oldPath; |
162 | } | 162 | } |
163 | 163 | ||
164 | void MimeMessageMover::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) | 164 | void MimeMessageMover::newEntity(Sink::ApplicationDomain::Mail &mail) |
165 | { | 165 | { |
166 | if (!mail.getMimeMessagePath().isEmpty()) { | 166 | if (!mail.getMimeMessagePath().isEmpty()) { |
167 | mail.setMimeMessagePath(moveMessage(mail.getMimeMessagePath(), mail)); | 167 | mail.setMimeMessagePath(moveMessage(mail.getMimeMessagePath(), mail)); |
168 | } | 168 | } |
169 | } | 169 | } |
170 | 170 | ||
171 | void MimeMessageMover::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::DataStore::Transaction &transaction) | 171 | void MimeMessageMover::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) |
172 | { | 172 | { |
173 | if (!newMail.getMimeMessagePath().isEmpty()) { | 173 | if (!newMail.getMimeMessagePath().isEmpty()) { |
174 | newMail.setMimeMessagePath(moveMessage(newMail.getMimeMessagePath(), newMail)); | 174 | newMail.setMimeMessagePath(moveMessage(newMail.getMimeMessagePath(), newMail)); |
175 | } | 175 | } |
176 | } | 176 | } |
177 | 177 | ||
178 | void MimeMessageMover::deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) | 178 | void MimeMessageMover::deletedEntity(const Sink::ApplicationDomain::Mail &mail) |
179 | { | 179 | { |
180 | QFile::remove(mail.getMimeMessagePath()); | 180 | QFile::remove(mail.getMimeMessagePath()); |
181 | } | 181 | } |
diff --git a/common/mailpreprocessor.h b/common/mailpreprocessor.h index c66517e..f979f22 100644 --- a/common/mailpreprocessor.h +++ b/common/mailpreprocessor.h | |||
@@ -24,8 +24,8 @@ class SINK_EXPORT MailPropertyExtractor : public Sink::EntityPreprocessor<Sink:: | |||
24 | { | 24 | { |
25 | public: | 25 | public: |
26 | virtual ~MailPropertyExtractor(){} | 26 | virtual ~MailPropertyExtractor(){} |
27 | virtual void newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 27 | virtual void newEntity(Sink::ApplicationDomain::Mail &mail) Q_DECL_OVERRIDE; |
28 | virtual void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 28 | virtual void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) Q_DECL_OVERRIDE; |
29 | protected: | 29 | protected: |
30 | virtual QString getFilePathFromMimeMessagePath(const QString &) const; | 30 | virtual QString getFilePathFromMimeMessagePath(const QString &) const; |
31 | }; | 31 | }; |
@@ -36,9 +36,9 @@ public: | |||
36 | MimeMessageMover(); | 36 | MimeMessageMover(); |
37 | virtual ~MimeMessageMover(){} | 37 | virtual ~MimeMessageMover(){} |
38 | 38 | ||
39 | void newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 39 | void newEntity(Sink::ApplicationDomain::Mail &mail) Q_DECL_OVERRIDE; |
40 | void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 40 | void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) Q_DECL_OVERRIDE; |
41 | void deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 41 | void deletedEntity(const Sink::ApplicationDomain::Mail &mail) Q_DECL_OVERRIDE; |
42 | 42 | ||
43 | private: | 43 | private: |
44 | QString moveMessage(const QString &oldPath, const Sink::ApplicationDomain::Mail &mail); | 44 | QString moveMessage(const QString &oldPath, const Sink::ApplicationDomain::Mail &mail); |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index e257857..ea59ae9 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -37,6 +37,7 @@ | |||
37 | #include "adaptorfactoryregistry.h" | 37 | #include "adaptorfactoryregistry.h" |
38 | #include "definitions.h" | 38 | #include "definitions.h" |
39 | #include "bufferutils.h" | 39 | #include "bufferutils.h" |
40 | #include "storage/entitystore.h" | ||
40 | 41 | ||
41 | SINK_DEBUG_AREA("pipeline") | 42 | SINK_DEBUG_AREA("pipeline") |
42 | 43 | ||
@@ -46,31 +47,18 @@ using namespace Sink::Storage; | |||
46 | class Pipeline::Private | 47 | class Pipeline::Private |
47 | { | 48 | { |
48 | public: | 49 | public: |
49 | Private(const ResourceContext &context) : resourceContext(context), storage(Sink::storageLocation(), context.instanceId(), DataStore::ReadWrite), revisionChanged(false) | 50 | Private(const ResourceContext &context) : resourceContext(context), entityStore(context), revisionChanged(false) |
50 | { | 51 | { |
51 | } | 52 | } |
52 | 53 | ||
53 | ResourceContext resourceContext; | 54 | ResourceContext resourceContext; |
54 | DataStore storage; | 55 | Storage::EntityStore entityStore; |
55 | DataStore::Transaction transaction; | ||
56 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; | 56 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; |
57 | bool revisionChanged; | 57 | bool revisionChanged; |
58 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | ||
59 | QTime transactionTime; | 58 | QTime transactionTime; |
60 | int transactionItemCount; | 59 | int transactionItemCount; |
61 | }; | 60 | }; |
62 | 61 | ||
63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | ||
64 | { | ||
65 | SinkTrace() << "Committing new revision: " << uid << newRevision; | ||
66 | DataStore::mainDatabase(transaction, bufferType) | ||
67 | .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | ||
68 | [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); | ||
69 | revisionChanged = true; | ||
70 | DataStore::setMaxRevision(transaction, newRevision); | ||
71 | DataStore::recordRevision(transaction, newRevision, uid, bufferType); | ||
72 | } | ||
73 | |||
74 | 62 | ||
75 | Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) | 63 | Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) |
76 | { | 64 | { |
@@ -78,7 +66,6 @@ Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Pri | |||
78 | 66 | ||
79 | Pipeline::~Pipeline() | 67 | Pipeline::~Pipeline() |
80 | { | 68 | { |
81 | d->transaction = DataStore::Transaction(); | ||
82 | } | 69 | } |
83 | 70 | ||
84 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) | 71 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) |
@@ -98,27 +85,10 @@ void Pipeline::startTransaction() | |||
98 | // for (auto processor : d->processors[bufferType]) { | 85 | // for (auto processor : d->processors[bufferType]) { |
99 | // processor->startBatch(); | 86 | // processor->startBatch(); |
100 | // } | 87 | // } |
101 | if (d->transaction) { | ||
102 | return; | ||
103 | } | ||
104 | SinkTrace() << "Starting transaction."; | 88 | SinkTrace() << "Starting transaction."; |
105 | d->transactionTime.start(); | 89 | d->transactionTime.start(); |
106 | d->transactionItemCount = 0; | 90 | d->transactionItemCount = 0; |
107 | d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { | 91 | d->entityStore.startTransaction(DataStore::ReadWrite); |
108 | SinkWarning() << error.message; | ||
109 | }); | ||
110 | |||
111 | //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. | ||
112 | //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). | ||
113 | //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync... | ||
114 | if (d->storage.exists()) { | ||
115 | while (!d->transaction.validateNamedDatabases()) { | ||
116 | SinkWarning() << "Opened an invalid transaction!!!!!!"; | ||
117 | d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { | ||
118 | SinkWarning() << error.message; | ||
119 | }); | ||
120 | } | ||
121 | } | ||
122 | } | 92 | } |
123 | 93 | ||
124 | void Pipeline::commit() | 94 | void Pipeline::commit() |
@@ -129,34 +99,20 @@ void Pipeline::commit() | |||
129 | // processor->finalize(); | 99 | // processor->finalize(); |
130 | // } | 100 | // } |
131 | if (!d->revisionChanged) { | 101 | if (!d->revisionChanged) { |
132 | d->transaction.abort(); | 102 | d->entityStore.abortTransaction(); |
133 | d->transaction = DataStore::Transaction(); | ||
134 | return; | 103 | return; |
135 | } | 104 | } |
136 | const auto revision = DataStore::maxRevision(d->transaction); | 105 | const auto revision = d->entityStore.maxRevision(); |
137 | const auto elapsed = d->transactionTime.elapsed(); | 106 | const auto elapsed = d->transactionTime.elapsed(); |
138 | SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " | 107 | SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " |
139 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | 108 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; |
140 | if (d->transaction) { | 109 | d->entityStore.commitTransaction(); |
141 | d->transaction.commit(); | ||
142 | } | ||
143 | d->transaction = DataStore::Transaction(); | ||
144 | if (d->revisionChanged) { | 110 | if (d->revisionChanged) { |
145 | d->revisionChanged = false; | 111 | d->revisionChanged = false; |
146 | emit revisionUpdated(revision); | 112 | emit revisionUpdated(revision); |
147 | } | 113 | } |
148 | } | 114 | } |
149 | 115 | ||
150 | DataStore::Transaction &Pipeline::transaction() | ||
151 | { | ||
152 | return d->transaction; | ||
153 | } | ||
154 | |||
155 | DataStore &Pipeline::storage() const | ||
156 | { | ||
157 | return d->storage; | ||
158 | } | ||
159 | |||
160 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | 116 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) |
161 | { | 117 | { |
162 | d->transactionItemCount++; | 118 | d->transactionItemCount++; |
@@ -175,7 +131,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
175 | QByteArray key; | 131 | QByteArray key; |
176 | if (createEntity->entityId()) { | 132 | if (createEntity->entityId()) { |
177 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); | 133 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); |
178 | if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) { | 134 | if (d->entityStore.contains(bufferType, key)) { |
179 | SinkError() << "An entity with this id already exists: " << key; | 135 | SinkError() << "An entity with this id already exists: " << key; |
180 | return KAsync::error<qint64>(0); | 136 | return KAsync::error<qint64>(0); |
181 | } | 137 | } |
@@ -208,29 +164,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
208 | 164 | ||
209 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 165 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
210 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); | 166 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); |
211 | foreach (const auto &processor, d->processors[bufferType]) { | ||
212 | processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); | ||
213 | } | ||
214 | //The maxRevision may have changed meanwhile if the entity created sub-entities | ||
215 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
216 | |||
217 | // Add metadata buffer | ||
218 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
219 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
220 | metadataBuilder.add_revision(newRevision); | ||
221 | metadataBuilder.add_operation(Operation_Creation); | ||
222 | metadataBuilder.add_replayToSource(replayToSource); | ||
223 | auto metadataBuffer = metadataBuilder.Finish(); | ||
224 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
225 | 167 | ||
226 | flatbuffers::FlatBufferBuilder fbb; | 168 | d->revisionChanged = true; |
227 | adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 169 | auto revision = d->entityStore.maxRevision(); |
170 | auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; | ||
171 | o.setChangedProperties(o.availableProperties().toSet()); | ||
228 | 172 | ||
229 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 173 | auto preprocess = [&, this](ApplicationDomain::ApplicationDomainType &newEntity) { |
174 | foreach (const auto &processor, d->processors[bufferType]) { | ||
175 | processor->newEntity(newEntity); | ||
176 | } | ||
177 | }; | ||
230 | 178 | ||
231 | //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource) | 179 | if (!d->entityStore.add(bufferType, o, replayToSource, preprocess)) { |
180 | return KAsync::error<qint64>(0); | ||
181 | } | ||
232 | 182 | ||
233 | return KAsync::value(newRevision); | 183 | return KAsync::value(d->entityStore.maxRevision()); |
234 | } | 184 | } |
235 | 185 | ||
236 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 186 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
@@ -254,6 +204,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
254 | } | 204 | } |
255 | const qint64 baseRevision = modifyEntity->revision(); | 205 | const qint64 baseRevision = modifyEntity->revision(); |
256 | const bool replayToSource = modifyEntity->replayToSource(); | 206 | const bool replayToSource = modifyEntity->replayToSource(); |
207 | |||
257 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); | 208 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); |
258 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | 209 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); |
259 | SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 210 | SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
@@ -269,7 +220,6 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
269 | } | 220 | } |
270 | } | 221 | } |
271 | 222 | ||
272 | // TODO use only readPropertyMapper and writePropertyMapper | ||
273 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); | 223 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
274 | if (!adaptorFactory) { | 224 | if (!adaptorFactory) { |
275 | SinkWarning() << "no adaptor factory for type " << bufferType; | 225 | SinkWarning() << "no adaptor factory for type " << bufferType; |
@@ -278,72 +228,26 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
278 | 228 | ||
279 | auto diffEntity = GetEntity(modifyEntity->delta()->Data()); | 229 | auto diffEntity = GetEntity(modifyEntity->delta()->Data()); |
280 | Q_ASSERT(diffEntity); | 230 | Q_ASSERT(diffEntity); |
281 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 231 | Sink::ApplicationDomain::ApplicationDomainType diff{d->resourceContext.instanceId(), key, baseRevision, adaptorFactory->createAdaptor(*diffEntity)}; |
282 | 232 | diff.setChangedProperties(changeset.toSet()); | |
283 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; | ||
284 | DataStore::mainDatabase(d->transaction, bufferType) | ||
285 | .findLatest(key, | ||
286 | [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
287 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
288 | if (!buffer.isValid()) { | ||
289 | SinkWarning() << "Read invalid buffer from disk"; | ||
290 | } else { | ||
291 | current = adaptorFactory->createAdaptor(buffer.entity()); | ||
292 | } | ||
293 | return false; | ||
294 | }, | ||
295 | [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); | ||
296 | |||
297 | if (!current) { | ||
298 | SinkWarning() << "Failed to read local value " << key; | ||
299 | return KAsync::error<qint64>(0); | ||
300 | } | ||
301 | |||
302 | auto newAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(current), current->availableProperties()); | ||
303 | |||
304 | // Apply diff | ||
305 | // FIXME only apply the properties that are available in the buffer | ||
306 | SinkTrace() << "Applying changed properties: " << changeset; | ||
307 | for (const auto &property : changeset) { | ||
308 | const auto value = diff->getProperty(property); | ||
309 | if (value.isValid()) { | ||
310 | newAdaptor->setProperty(property, value); | ||
311 | } | ||
312 | } | ||
313 | 233 | ||
314 | // Remove deletions | 234 | QByteArrayList deletions; |
315 | if (modifyEntity->deletions()) { | 235 | if (modifyEntity->deletions()) { |
316 | for (const flatbuffers::String *property : *modifyEntity->deletions()) { | 236 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); |
317 | newAdaptor->setProperty(BufferUtils::extractBuffer(property), QVariant()); | ||
318 | } | ||
319 | } | 237 | } |
320 | 238 | ||
321 | newAdaptor->resetChangedProperties(); | 239 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { |
322 | foreach (const auto &processor, d->processors[bufferType]) { | 240 | foreach (const auto &processor, d->processors[bufferType]) { |
323 | processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); | 241 | processor->modifiedEntity(oldEntity, newEntity); |
324 | } | 242 | } |
325 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 243 | }; |
326 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
327 | 244 | ||
328 | // Add metadata buffer | 245 | d->revisionChanged = true; |
329 | flatbuffers::FlatBufferBuilder metadataFbb; | 246 | if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { |
330 | { | 247 | return KAsync::error<qint64>(0); |
331 | //We add availableProperties to account for the properties that have been changed by the preprocessors | ||
332 | auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newAdaptor->changedProperties()); | ||
333 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
334 | metadataBuilder.add_revision(newRevision); | ||
335 | metadataBuilder.add_operation(Operation_Modification); | ||
336 | metadataBuilder.add_replayToSource(replayToSource); | ||
337 | metadataBuilder.add_modifiedProperties(modifiedProperties); | ||
338 | auto metadataBuffer = metadataBuilder.Finish(); | ||
339 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
340 | } | 248 | } |
341 | 249 | ||
342 | flatbuffers::FlatBufferBuilder fbb; | 250 | return KAsync::value(d->entityStore.maxRevision()); |
343 | adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | ||
344 | |||
345 | d->storeNewRevision(newRevision, fbb, bufferType, key); | ||
346 | return KAsync::value(newRevision); | ||
347 | } | 251 | } |
348 | 252 | ||
349 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | 253 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) |
@@ -364,106 +268,38 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
364 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 268 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
365 | SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 269 | SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
366 | 270 | ||
367 | bool found = false; | 271 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { |
368 | bool alreadyRemoved = false; | 272 | foreach (const auto &processor, d->processors[bufferType]) { |
369 | DataStore::mainDatabase(d->transaction, bufferType) | 273 | processor->deletedEntity(oldEntity); |
370 | .findLatest(key, | 274 | } |
371 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | 275 | }; |
372 | auto entity = GetEntity(data.data()); | ||
373 | if (entity && entity->metadata()) { | ||
374 | auto metadata = GetMetadata(entity->metadata()->Data()); | ||
375 | found = true; | ||
376 | if (metadata->operation() == Operation_Removal) { | ||
377 | alreadyRemoved = true; | ||
378 | } | ||
379 | } | ||
380 | return false; | ||
381 | }, | ||
382 | [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); | ||
383 | |||
384 | if (!found) { | ||
385 | SinkWarning() << "Failed to find entity " << key; | ||
386 | return KAsync::error<qint64>(0); | ||
387 | } | ||
388 | if (alreadyRemoved) { | ||
389 | SinkWarning() << "Entity is already removed " << key; | ||
390 | return KAsync::error<qint64>(0); | ||
391 | } | ||
392 | |||
393 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
394 | |||
395 | // Add metadata buffer | ||
396 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
397 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
398 | metadataBuilder.add_revision(newRevision); | ||
399 | metadataBuilder.add_operation(Operation_Removal); | ||
400 | metadataBuilder.add_replayToSource(replayToSource); | ||
401 | auto metadataBuffer = metadataBuilder.Finish(); | ||
402 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
403 | |||
404 | flatbuffers::FlatBufferBuilder fbb; | ||
405 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | ||
406 | 276 | ||
407 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); | 277 | d->revisionChanged = true; |
408 | if (!adaptorFactory) { | 278 | if (!d->entityStore.remove(bufferType, key, replayToSource, preprocess)) { |
409 | SinkWarning() << "no adaptor factory for type " << bufferType; | ||
410 | return KAsync::error<qint64>(0); | 279 | return KAsync::error<qint64>(0); |
411 | } | 280 | } |
412 | 281 | ||
413 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; | 282 | return KAsync::value(d->entityStore.maxRevision()); |
414 | DataStore::mainDatabase(d->transaction, bufferType) | ||
415 | .findLatest(key, | ||
416 | [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | ||
417 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
418 | if (!buffer.isValid()) { | ||
419 | SinkWarning() << "Read invalid buffer from disk"; | ||
420 | } else { | ||
421 | current = adaptorFactory->createAdaptor(buffer.entity()); | ||
422 | } | ||
423 | return false; | ||
424 | }, | ||
425 | [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); | ||
426 | |||
427 | d->storeNewRevision(newRevision, fbb, bufferType, key); | ||
428 | |||
429 | foreach (const auto &processor, d->processors[bufferType]) { | ||
430 | processor->deletedEntity(key, newRevision, *current, d->transaction); | ||
431 | } | ||
432 | |||
433 | return KAsync::value(newRevision); | ||
434 | } | 283 | } |
435 | 284 | ||
436 | void Pipeline::cleanupRevision(qint64 revision) | 285 | void Pipeline::cleanupRevision(qint64 revision) |
437 | { | 286 | { |
287 | d->entityStore.cleanupRevision(revision); | ||
438 | d->revisionChanged = true; | 288 | d->revisionChanged = true; |
439 | const auto uid = DataStore::getUidFromRevision(d->transaction, revision); | ||
440 | const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); | ||
441 | SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; | ||
442 | DataStore::mainDatabase(d->transaction, bufferType) | ||
443 | .scan(uid, | ||
444 | [&](const QByteArray &key, const QByteArray &data) -> bool { | ||
445 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
446 | if (!buffer.isValid()) { | ||
447 | SinkWarning() << "Read invalid buffer from disk"; | ||
448 | } else { | ||
449 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); | ||
450 | const qint64 rev = metadata->revision(); | ||
451 | // Remove old revisions, and the current if the entity has already been removed | ||
452 | if (rev < revision || metadata->operation() == Operation_Removal) { | ||
453 | DataStore::removeRevision(d->transaction, rev); | ||
454 | DataStore::mainDatabase(d->transaction, bufferType).remove(key); | ||
455 | } | ||
456 | } | ||
457 | |||
458 | return true; | ||
459 | }, | ||
460 | [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); | ||
461 | DataStore::setCleanedUpRevision(d->transaction, revision); | ||
462 | } | 289 | } |
463 | 290 | ||
464 | qint64 Pipeline::cleanedUpRevision() | 291 | qint64 Pipeline::cleanedUpRevision() |
465 | { | 292 | { |
466 | return DataStore::cleanedUpRevision(d->transaction); | 293 | /* return d->entityStore.cleanedUpRevision(); */ |
294 | /* return DataStore::cleanedUpRevision(d->transaction); */ | ||
295 | //FIXME Just move the whole cleanup revision iteration into the entitystore | ||
296 | return 0; | ||
297 | } | ||
298 | |||
299 | qint64 Pipeline::revision() | ||
300 | { | ||
301 | //FIXME Just move the whole cleanup revision iteration into the entitystore | ||
302 | return 0; | ||
467 | } | 303 | } |
468 | 304 | ||
469 | class Preprocessor::Private { | 305 | class Preprocessor::Private { |
@@ -492,7 +328,7 @@ void Preprocessor::startBatch() | |||
492 | { | 328 | { |
493 | } | 329 | } |
494 | 330 | ||
495 | void Preprocessor::finalize() | 331 | void Preprocessor::finalizeBatch() |
496 | { | 332 | { |
497 | } | 333 | } |
498 | 334 | ||
@@ -510,7 +346,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain | |||
510 | 346 | ||
511 | flatbuffers::FlatBufferBuilder fbb; | 347 | flatbuffers::FlatBufferBuilder fbb; |
512 | auto entityId = fbb.CreateString(entity.identifier()); | 348 | auto entityId = fbb.CreateString(entity.identifier()); |
513 | // This is the resource buffer type and not the domain type | ||
514 | auto type = fbb.CreateString(typeName); | 349 | auto type = fbb.CreateString(typeName); |
515 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); | 350 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); |
516 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); | 351 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); |
diff --git a/common/pipeline.h b/common/pipeline.h index bf94017..0461507 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -31,7 +31,8 @@ | |||
31 | 31 | ||
32 | #include <Async/Async> | 32 | #include <Async/Async> |
33 | 33 | ||
34 | #include "domainadaptor.h" | 34 | #include <bufferadaptor.h> |
35 | #include <resourcecontext.h> | ||
35 | 36 | ||
36 | namespace Sink { | 37 | namespace Sink { |
37 | 38 | ||
@@ -45,16 +46,14 @@ public: | |||
45 | Pipeline(const ResourceContext &context); | 46 | Pipeline(const ResourceContext &context); |
46 | ~Pipeline(); | 47 | ~Pipeline(); |
47 | 48 | ||
48 | Storage::DataStore &storage() const; | ||
49 | |||
50 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); | 49 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); |
51 | void startTransaction(); | 50 | void startTransaction(); |
52 | void commit(); | 51 | void commit(); |
53 | Storage::DataStore::Transaction &transaction(); | ||
54 | 52 | ||
55 | KAsync::Job<qint64> newEntity(void const *command, size_t size); | 53 | KAsync::Job<qint64> newEntity(void const *command, size_t size); |
56 | KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); | 54 | KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); |
57 | KAsync::Job<qint64> deletedEntity(void const *command, size_t size); | 55 | KAsync::Job<qint64> deletedEntity(void const *command, size_t size); |
56 | |||
58 | /* | 57 | /* |
59 | * Cleans up a single revision. | 58 | * Cleans up a single revision. |
60 | * | 59 | * |
@@ -66,6 +65,7 @@ public: | |||
66 | * Returns the latest cleaned up revision. | 65 | * Returns the latest cleaned up revision. |
67 | */ | 66 | */ |
68 | qint64 cleanedUpRevision(); | 67 | qint64 cleanedUpRevision(); |
68 | qint64 revision(); | ||
69 | 69 | ||
70 | signals: | 70 | signals: |
71 | void revisionUpdated(qint64); | 71 | void revisionUpdated(qint64); |
@@ -82,11 +82,10 @@ public: | |||
82 | virtual ~Preprocessor(); | 82 | virtual ~Preprocessor(); |
83 | 83 | ||
84 | virtual void startBatch(); | 84 | virtual void startBatch(); |
85 | virtual void newEntity(const QByteArray &uid, qint64 revision, ApplicationDomain::BufferAdaptor &newEntity, Storage::DataStore::Transaction &transaction) {}; | 85 | virtual void newEntity(ApplicationDomain::ApplicationDomainType &newEntity) {}; |
86 | virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, | 86 | virtual void modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) {}; |
87 | ApplicationDomain::BufferAdaptor &newEntity, Storage::DataStore::Transaction &transaction) {}; | 87 | virtual void deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity) {}; |
88 | virtual void deletedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, Storage::DataStore::Transaction &transaction) {}; | 88 | virtual void finalizeBatch(); |
89 | virtual void finalize(); | ||
90 | 89 | ||
91 | void setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *); | 90 | void setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *); |
92 | 91 | ||
@@ -110,27 +109,28 @@ template<typename DomainType> | |||
110 | class SINK_EXPORT EntityPreprocessor: public Preprocessor | 109 | class SINK_EXPORT EntityPreprocessor: public Preprocessor |
111 | { | 110 | { |
112 | public: | 111 | public: |
113 | virtual void newEntity(DomainType &, Storage::DataStore::Transaction &transaction) {}; | 112 | virtual void newEntity(DomainType &) {}; |
114 | virtual void modifiedEntity(const DomainType &oldEntity, DomainType &newEntity, Storage::DataStore::Transaction &transaction) {}; | 113 | virtual void modifiedEntity(const DomainType &oldEntity, DomainType &newEntity) {}; |
115 | virtual void deletedEntity(const DomainType &oldEntity, Storage::DataStore::Transaction &transaction) {}; | 114 | virtual void deletedEntity(const DomainType &oldEntity) {}; |
116 | 115 | ||
117 | private: | 116 | private: |
118 | static void nullDeleter(ApplicationDomain::BufferAdaptor *) {} | 117 | virtual void newEntity(ApplicationDomain::ApplicationDomainType &newEntity_) Q_DECL_OVERRIDE |
119 | virtual void newEntity(const QByteArray &uid, qint64 revision, ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
120 | { | 118 | { |
121 | auto o = DomainType("", uid, revision, QSharedPointer<ApplicationDomain::BufferAdaptor>(&bufferAdaptor, nullDeleter)); | 119 | //Modifications still work due to the underlying shared adaptor |
122 | newEntity(o, transaction); | 120 | auto newEntityCopy = DomainType(newEntity_); |
121 | newEntity(newEntityCopy); | ||
123 | } | 122 | } |
124 | 123 | ||
125 | virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, | 124 | virtual void modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity_) Q_DECL_OVERRIDE |
126 | ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
127 | { | 125 | { |
128 | auto o = DomainType("", uid, revision, QSharedPointer<ApplicationDomain::BufferAdaptor>(&bufferAdaptor, nullDeleter)); | 126 | //Modifications still work due to the underlying shared adaptor |
129 | modifiedEntity(DomainType("", uid, 0, QSharedPointer<ApplicationDomain::BufferAdaptor>(const_cast<ApplicationDomain::BufferAdaptor*>(&oldEntity), nullDeleter)), o, transaction); | 127 | auto newEntityCopy = DomainType(newEntity_); |
128 | modifiedEntity(DomainType(oldEntity), newEntityCopy); | ||
130 | } | 129 | } |
131 | virtual void deletedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | 130 | |
131 | virtual void deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE | ||
132 | { | 132 | { |
133 | deletedEntity(DomainType("", uid, revision, QSharedPointer<ApplicationDomain::BufferAdaptor>(const_cast<ApplicationDomain::BufferAdaptor*>(&bufferAdaptor), nullDeleter)), transaction); | 133 | deletedEntity(DomainType(oldEntity)); |
134 | } | 134 | } |
135 | }; | 135 | }; |
136 | 136 | ||
diff --git a/common/specialpurposepreprocessor.cpp b/common/specialpurposepreprocessor.cpp index 0fd8e34..920f78a 100644 --- a/common/specialpurposepreprocessor.cpp +++ b/common/specialpurposepreprocessor.cpp | |||
@@ -46,11 +46,11 @@ QByteArray getSpecialPurposeType(const QString &name) | |||
46 | 46 | ||
47 | SpecialPurposeProcessor::SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier) {} | 47 | SpecialPurposeProcessor::SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier) {} |
48 | 48 | ||
49 | QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::DataStore::Transaction &transaction, const QByteArray &specialPurpose) | 49 | QByteArray SpecialPurposeProcessor::ensureFolder(const QByteArray &specialPurpose) |
50 | { | 50 | { |
51 | /* if (!mSpecialPurposeFolders.contains(specialPurpose)) { */ | 51 | /* if (!mSpecialPurposeFolders.contains(specialPurpose)) { */ |
52 | /* //Try to find an existing drafts folder */ | 52 | /* //Try to find an existing drafts folder */ |
53 | /* Sink::EntityReader<ApplicationDomain::Folder> reader(mResourceType, mResourceInstanceIdentifier, transaction); */ | 53 | /* Sink::EntityReader<ApplicationDomain::Folder> reader(mResourceType, mResourceInstanceIdentifier); */ |
54 | /* reader.query(Sink::Query().filter<ApplicationDomain::Folder::SpecialPurpose>(Query::Comparator(specialPurpose, Query::Comparator::Contains)), */ | 54 | /* reader.query(Sink::Query().filter<ApplicationDomain::Folder::SpecialPurpose>(Query::Comparator(specialPurpose, Query::Comparator::Contains)), */ |
55 | /* [this, specialPurpose](const ApplicationDomain::Folder &f) -> bool{ */ | 55 | /* [this, specialPurpose](const ApplicationDomain::Folder &f) -> bool{ */ |
56 | /* mSpecialPurposeFolders.insert(specialPurpose, f.identifier()); */ | 56 | /* mSpecialPurposeFolders.insert(specialPurpose, f.identifier()); */ |
@@ -70,23 +70,23 @@ QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::DataStore::Trans | |||
70 | return mSpecialPurposeFolders.value(specialPurpose); | 70 | return mSpecialPurposeFolders.value(specialPurpose); |
71 | } | 71 | } |
72 | 72 | ||
73 | void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) | 73 | void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::ApplicationDomainType &newEntity) |
74 | { | 74 | { |
75 | if (newEntity.getProperty("trash").toBool()) { | 75 | if (newEntity.getProperty("trash").toBool()) { |
76 | newEntity.setProperty("folder", ensureFolder(transaction, "trash")); | 76 | newEntity.setProperty("folder", ensureFolder("trash")); |
77 | return; | 77 | return; |
78 | } | 78 | } |
79 | if (newEntity.getProperty("draft").toBool()) { | 79 | if (newEntity.getProperty("draft").toBool()) { |
80 | newEntity.setProperty("folder", ensureFolder(transaction, "drafts")); | 80 | newEntity.setProperty("folder", ensureFolder("drafts")); |
81 | } | 81 | } |
82 | } | 82 | } |
83 | 83 | ||
84 | void SpecialPurposeProcessor::newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) | 84 | void SpecialPurposeProcessor::newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) |
85 | { | 85 | { |
86 | moveToFolder(newEntity, transaction); | 86 | moveToFolder(newEntity); |
87 | } | 87 | } |
88 | 88 | ||
89 | void SpecialPurposeProcessor::modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) | 89 | void SpecialPurposeProcessor::modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) |
90 | { | 90 | { |
91 | moveToFolder(newEntity, transaction); | 91 | moveToFolder(newEntity); |
92 | } | 92 | } |
diff --git a/common/specialpurposepreprocessor.h b/common/specialpurposepreprocessor.h index 8b2d9e9..f2aeb20 100644 --- a/common/specialpurposepreprocessor.h +++ b/common/specialpurposepreprocessor.h | |||
@@ -30,12 +30,12 @@ class SINK_EXPORT SpecialPurposeProcessor : public Sink::Preprocessor | |||
30 | public: | 30 | public: |
31 | SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); | 31 | SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); |
32 | 32 | ||
33 | QByteArray ensureFolder(Sink::Storage::DataStore::Transaction &transaction, const QByteArray &specialPurpose); | 33 | QByteArray ensureFolder(const QByteArray &specialPurpose); |
34 | 34 | ||
35 | void moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction); | 35 | void moveToFolder(Sink::ApplicationDomain::ApplicationDomainType &newEntity); |
36 | 36 | ||
37 | void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 37 | void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE; |
38 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 38 | void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE; |
39 | 39 | ||
40 | QHash<QByteArray, QByteArray> mSpecialPurposeFolders; | 40 | QHash<QByteArray, QByteArray> mSpecialPurposeFolders; |
41 | QByteArray mResourceType; | 41 | QByteArray mResourceType; |
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index fe63f0b..30c7a71 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -25,6 +25,8 @@ | |||
25 | #include "definitions.h" | 25 | #include "definitions.h" |
26 | #include "resourcecontext.h" | 26 | #include "resourcecontext.h" |
27 | #include "index.h" | 27 | #include "index.h" |
28 | #include "bufferutils.h" | ||
29 | #include "entity_generated.h" | ||
28 | 30 | ||
29 | #include "mail.h" | 31 | #include "mail.h" |
30 | #include "folder.h" | 32 | #include "folder.h" |
@@ -108,16 +110,199 @@ void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMo | |||
108 | 110 | ||
109 | void EntityStore::commitTransaction() | 111 | void EntityStore::commitTransaction() |
110 | { | 112 | { |
113 | SinkTrace() << "Committing transaction"; | ||
111 | d->transaction.commit(); | 114 | d->transaction.commit(); |
112 | d->transaction = Storage::DataStore::Transaction(); | 115 | d->transaction = Storage::DataStore::Transaction(); |
113 | } | 116 | } |
114 | 117 | ||
115 | void EntityStore::abortTransaction() | 118 | void EntityStore::abortTransaction() |
116 | { | 119 | { |
120 | SinkTrace() << "Aborting transaction"; | ||
117 | d->transaction.abort(); | 121 | d->transaction.abort(); |
118 | d->transaction = Storage::DataStore::Transaction(); | 122 | d->transaction = Storage::DataStore::Transaction(); |
119 | } | 123 | } |
120 | 124 | ||
125 | bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) | ||
126 | { | ||
127 | if (entity_.identifier().isEmpty()) { | ||
128 | SinkWarning() << "Can't write entity with an empty identifier"; | ||
129 | return false; | ||
130 | } | ||
131 | |||
132 | auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(entity_, entity_.availableProperties()); | ||
133 | entity.setChangedProperties(entity.availableProperties().toSet()); | ||
134 | |||
135 | preprocess(entity); | ||
136 | d->typeIndex(type).add(entity.identifier(), entity, d->transaction); | ||
137 | |||
138 | //The maxRevision may have changed meanwhile if the entity created sub-entities | ||
139 | const qint64 newRevision = maxRevision() + 1; | ||
140 | |||
141 | // Add metadata buffer | ||
142 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
143 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
144 | metadataBuilder.add_revision(newRevision); | ||
145 | metadataBuilder.add_operation(Operation_Creation); | ||
146 | metadataBuilder.add_replayToSource(replayToSource); | ||
147 | auto metadataBuffer = metadataBuilder.Finish(); | ||
148 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
149 | |||
150 | flatbuffers::FlatBufferBuilder fbb; | ||
151 | d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | ||
152 | |||
153 | DataStore::mainDatabase(d->transaction, type) | ||
154 | .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), | ||
155 | [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << entity.identifier() << newRevision; }); | ||
156 | DataStore::setMaxRevision(d->transaction, newRevision); | ||
157 | DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); | ||
158 | SinkTrace() << "Wrote entity: " << entity.identifier() << type << newRevision; | ||
159 | return true; | ||
160 | } | ||
161 | |||
162 | bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess) | ||
163 | { | ||
164 | auto changeset = diff.changedProperties(); | ||
165 | //TODO handle errors | ||
166 | const auto current = readLatest(type, diff.identifier()); | ||
167 | if (current.identifier().isEmpty()) { | ||
168 | SinkWarning() << "Failed to read current version: " << diff.identifier(); | ||
169 | return false; | ||
170 | } | ||
171 | |||
172 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); | ||
173 | |||
174 | // Apply diff | ||
175 | //SinkTrace() << "Applying changed properties: " << changeset; | ||
176 | for (const auto &property : changeset) { | ||
177 | const auto value = diff.getProperty(property); | ||
178 | if (value.isValid()) { | ||
179 | //SinkTrace() << "Setting property: " << property; | ||
180 | newEntity.setProperty(property, value); | ||
181 | } | ||
182 | } | ||
183 | |||
184 | // Remove deletions | ||
185 | for (const auto property : deletions) { | ||
186 | //SinkTrace() << "Removing property: " << property; | ||
187 | newEntity.setProperty(property, QVariant()); | ||
188 | } | ||
189 | |||
190 | preprocess(current, newEntity); | ||
191 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); | ||
192 | d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); | ||
193 | |||
194 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
195 | |||
196 | // Add metadata buffer | ||
197 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
198 | { | ||
199 | //We add availableProperties to account for the properties that have been changed by the preprocessors | ||
200 | auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties()); | ||
201 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
202 | metadataBuilder.add_revision(newRevision); | ||
203 | metadataBuilder.add_operation(Operation_Modification); | ||
204 | metadataBuilder.add_replayToSource(replayToSource); | ||
205 | metadataBuilder.add_modifiedProperties(modifiedProperties); | ||
206 | auto metadataBuffer = metadataBuilder.Finish(); | ||
207 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
208 | } | ||
209 | |||
210 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); | ||
211 | SinkTrace() << "All properties: " << newEntity.availableProperties(); | ||
212 | |||
213 | flatbuffers::FlatBufferBuilder fbb; | ||
214 | d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | ||
215 | |||
216 | DataStore::mainDatabase(d->transaction, type) | ||
217 | .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), | ||
218 | [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << newEntity.identifier() << newRevision; }); | ||
219 | DataStore::setMaxRevision(d->transaction, newRevision); | ||
220 | DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); | ||
221 | SinkTrace() << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; | ||
222 | return true; | ||
223 | } | ||
224 | |||
225 | bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess) | ||
226 | { | ||
227 | bool found = false; | ||
228 | bool alreadyRemoved = false; | ||
229 | DataStore::mainDatabase(d->transaction, type) | ||
230 | .findLatest(uid, | ||
231 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | ||
232 | auto entity = GetEntity(data.data()); | ||
233 | if (entity && entity->metadata()) { | ||
234 | auto metadata = GetMetadata(entity->metadata()->Data()); | ||
235 | found = true; | ||
236 | if (metadata->operation() == Operation_Removal) { | ||
237 | alreadyRemoved = true; | ||
238 | } | ||
239 | } | ||
240 | return false; | ||
241 | }, | ||
242 | [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); | ||
243 | |||
244 | if (!found) { | ||
245 | SinkWarning() << "Failed to find entity " << uid; | ||
246 | return false; | ||
247 | } | ||
248 | if (alreadyRemoved) { | ||
249 | SinkWarning() << "Entity is already removed " << uid; | ||
250 | return false; | ||
251 | } | ||
252 | |||
253 | const auto current = readLatest(type, uid); | ||
254 | preprocess(current); | ||
255 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); | ||
256 | |||
257 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
258 | |||
259 | // Add metadata buffer | ||
260 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
261 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
262 | metadataBuilder.add_revision(newRevision); | ||
263 | metadataBuilder.add_operation(Operation_Removal); | ||
264 | metadataBuilder.add_replayToSource(replayToSource); | ||
265 | auto metadataBuffer = metadataBuilder.Finish(); | ||
266 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
267 | |||
268 | flatbuffers::FlatBufferBuilder fbb; | ||
269 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | ||
270 | |||
271 | DataStore::mainDatabase(d->transaction, type) | ||
272 | .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | ||
273 | [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); | ||
274 | DataStore::setMaxRevision(d->transaction, newRevision); | ||
275 | DataStore::recordRevision(d->transaction, newRevision, uid, type); | ||
276 | return true; | ||
277 | } | ||
278 | |||
279 | void EntityStore::cleanupRevision(qint64 revision) | ||
280 | { | ||
281 | const auto uid = DataStore::getUidFromRevision(d->transaction, revision); | ||
282 | const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); | ||
283 | SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; | ||
284 | DataStore::mainDatabase(d->transaction, bufferType) | ||
285 | .scan(uid, | ||
286 | [&](const QByteArray &key, const QByteArray &data) -> bool { | ||
287 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
288 | if (!buffer.isValid()) { | ||
289 | SinkWarning() << "Read invalid buffer from disk"; | ||
290 | } else { | ||
291 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); | ||
292 | const qint64 rev = metadata->revision(); | ||
293 | // Remove old revisions, and the current if the entity has already been removed | ||
294 | if (rev < revision || metadata->operation() == Operation_Removal) { | ||
295 | DataStore::removeRevision(d->transaction, rev); | ||
296 | DataStore::mainDatabase(d->transaction, bufferType).remove(key); | ||
297 | } | ||
298 | } | ||
299 | |||
300 | return true; | ||
301 | }, | ||
302 | [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); | ||
303 | DataStore::setCleanedUpRevision(d->transaction, revision); | ||
304 | } | ||
305 | |||
121 | QVector<QByteArray> EntityStore::fullScan(const QByteArray &type) | 306 | QVector<QByteArray> EntityStore::fullScan(const QByteArray &type) |
122 | { | 307 | { |
123 | SinkTrace() << "Looking for : " << type; | 308 | SinkTrace() << "Looking for : " << type; |
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index 455e9c3..65bff50 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h | |||
@@ -38,9 +38,14 @@ public: | |||
38 | typedef QSharedPointer<EntityStore> Ptr; | 38 | typedef QSharedPointer<EntityStore> Ptr; |
39 | EntityStore(const ResourceContext &resourceContext); | 39 | EntityStore(const ResourceContext &resourceContext); |
40 | 40 | ||
41 | void add(const ApplicationDomain::ApplicationDomainType &); | 41 | typedef std::function<void(const ApplicationDomain::ApplicationDomainType &, ApplicationDomain::ApplicationDomainType &)> PreprocessModification; |
42 | void modify(const ApplicationDomain::ApplicationDomainType &); | 42 | typedef std::function<void(ApplicationDomain::ApplicationDomainType &)> PreprocessCreation; |
43 | void remove(const ApplicationDomain::ApplicationDomainType &); | 43 | typedef std::function<void(const ApplicationDomain::ApplicationDomainType &)> PreprocessRemoval; |
44 | |||
45 | bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); | ||
46 | bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &); | ||
47 | bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); | ||
48 | void cleanupRevision(qint64 revision); | ||
44 | 49 | ||
45 | void startTransaction(Sink::Storage::DataStore::AccessMode); | 50 | void startTransaction(Sink::Storage::DataStore::AccessMode); |
46 | void commitTransaction(); | 51 | void commitTransaction(); |
diff --git a/common/typeindex.cpp b/common/typeindex.cpp index 64c2a01..7920efc 100644 --- a/common/typeindex.cpp +++ b/common/typeindex.cpp | |||
@@ -108,30 +108,30 @@ void TypeIndex::addPropertyWithSorting<QByteArray, QDateTime>(const QByteArray & | |||
108 | mSortedProperties.insert(property, sortProperty); | 108 | mSortedProperties.insert(property, sortProperty); |
109 | } | 109 | } |
110 | 110 | ||
111 | void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | 111 | void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction) |
112 | { | 112 | { |
113 | for (const auto &property : mProperties) { | 113 | for (const auto &property : mProperties) { |
114 | const auto value = bufferAdaptor.getProperty(property); | 114 | const auto value = entity.getProperty(property); |
115 | auto indexer = mIndexer.value(property); | 115 | auto indexer = mIndexer.value(property); |
116 | indexer(identifier, value, transaction); | 116 | indexer(identifier, value, transaction); |
117 | } | 117 | } |
118 | for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { | 118 | for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { |
119 | const auto value = bufferAdaptor.getProperty(it.key()); | 119 | const auto value = entity.getProperty(it.key()); |
120 | const auto sortValue = bufferAdaptor.getProperty(it.value()); | 120 | const auto sortValue = entity.getProperty(it.value()); |
121 | auto indexer = mSortIndexer.value(it.key() + it.value()); | 121 | auto indexer = mSortIndexer.value(it.key() + it.value()); |
122 | indexer(identifier, value, sortValue, transaction); | 122 | indexer(identifier, value, sortValue, transaction); |
123 | } | 123 | } |
124 | } | 124 | } |
125 | 125 | ||
126 | void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | 126 | void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction) |
127 | { | 127 | { |
128 | for (const auto &property : mProperties) { | 128 | for (const auto &property : mProperties) { |
129 | const auto value = bufferAdaptor.getProperty(property); | 129 | const auto value = entity.getProperty(property); |
130 | Index(indexName(property), transaction).remove(getByteArray(value), identifier); | 130 | Index(indexName(property), transaction).remove(getByteArray(value), identifier); |
131 | } | 131 | } |
132 | for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { | 132 | for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { |
133 | const auto propertyValue = bufferAdaptor.getProperty(it.key()); | 133 | const auto propertyValue = entity.getProperty(it.key()); |
134 | const auto sortValue = bufferAdaptor.getProperty(it.value()); | 134 | const auto sortValue = entity.getProperty(it.value()); |
135 | if (sortValue.type() == QVariant::DateTime) { | 135 | if (sortValue.type() == QVariant::DateTime) { |
136 | Index(indexName(it.key(), it.value()), transaction).remove(propertyValue.toByteArray() + toSortableByteArray(sortValue.toDateTime()), identifier); | 136 | Index(indexName(it.key(), it.value()), transaction).remove(propertyValue.toByteArray() + toSortableByteArray(sortValue.toDateTime()), identifier); |
137 | } else { | 137 | } else { |
diff --git a/common/typeindex.h b/common/typeindex.h index 2638577..e11e673 100644 --- a/common/typeindex.h +++ b/common/typeindex.h | |||
@@ -19,7 +19,6 @@ | |||
19 | #pragma once | 19 | #pragma once |
20 | 20 | ||
21 | #include "resultset.h" | 21 | #include "resultset.h" |
22 | #include "bufferadaptor.h" | ||
23 | #include "storage.h" | 22 | #include "storage.h" |
24 | #include "query.h" | 23 | #include "query.h" |
25 | #include "log.h" | 24 | #include "log.h" |
@@ -52,8 +51,8 @@ public: | |||
52 | { | 51 | { |
53 | mSecondaryProperties.insert(Left::name, Right::name); | 52 | mSecondaryProperties.insert(Left::name, Right::name); |
54 | } | 53 | } |
55 | void add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | 54 | void add(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction); |
56 | void remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | 55 | void remove(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction); |
57 | 56 | ||
58 | QVector<QByteArray> query(const Sink::Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction); | 57 | QVector<QByteArray> query(const Sink::Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction); |
59 | QVector<QByteArray> lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction); | 58 | QVector<QByteArray> lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction); |
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index e288be2..5513986 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp | |||
@@ -35,7 +35,6 @@ | |||
35 | #include "dummystore.h" | 35 | #include "dummystore.h" |
36 | #include "definitions.h" | 36 | #include "definitions.h" |
37 | #include "facadefactory.h" | 37 | #include "facadefactory.h" |
38 | #include "indexupdater.h" | ||
39 | #include "adaptorfactoryregistry.h" | 38 | #include "adaptorfactoryregistry.h" |
40 | #include "synchronizer.h" | 39 | #include "synchronizer.h" |
41 | #include "mailpreprocessor.h" | 40 | #include "mailpreprocessor.h" |
@@ -135,11 +134,11 @@ DummyResource::DummyResource(const Sink::ResourceContext &resourceContext, const | |||
135 | setupSynchronizer(QSharedPointer<DummySynchronizer>::create(resourceContext)); | 134 | setupSynchronizer(QSharedPointer<DummySynchronizer>::create(resourceContext)); |
136 | setupChangereplay(QSharedPointer<Sink::NullChangeReplay>::create(resourceContext)); | 135 | setupChangereplay(QSharedPointer<Sink::NullChangeReplay>::create(resourceContext)); |
137 | setupPreprocessors(ENTITY_TYPE_MAIL, | 136 | setupPreprocessors(ENTITY_TYPE_MAIL, |
138 | QVector<Sink::Preprocessor*>() << new MailPropertyExtractor << new DefaultIndexUpdater<Sink::ApplicationDomain::Mail>); | 137 | QVector<Sink::Preprocessor*>() << new MailPropertyExtractor); |
139 | setupPreprocessors(ENTITY_TYPE_FOLDER, | 138 | setupPreprocessors(ENTITY_TYPE_FOLDER, |
140 | QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Folder>); | 139 | QVector<Sink::Preprocessor*>()); |
141 | setupPreprocessors(ENTITY_TYPE_EVENT, | 140 | setupPreprocessors(ENTITY_TYPE_EVENT, |
142 | QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Event>); | 141 | QVector<Sink::Preprocessor*>()); |
143 | } | 142 | } |
144 | 143 | ||
145 | DummyResource::~DummyResource() | 144 | DummyResource::~DummyResource() |
diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp index 90dae7a..9656a04 100644 --- a/examples/imapresource/imapresource.cpp +++ b/examples/imapresource/imapresource.cpp | |||
@@ -33,7 +33,6 @@ | |||
33 | #include "domain/mail.h" | 33 | #include "domain/mail.h" |
34 | #include "definitions.h" | 34 | #include "definitions.h" |
35 | #include "facadefactory.h" | 35 | #include "facadefactory.h" |
36 | #include "indexupdater.h" | ||
37 | #include "inspection.h" | 36 | #include "inspection.h" |
38 | #include "synchronizer.h" | 37 | #include "synchronizer.h" |
39 | #include "sourcewriteback.h" | 38 | #include "sourcewriteback.h" |
@@ -527,8 +526,8 @@ ImapResource::ImapResource(const ResourceContext &resourceContext, const QShared | |||
527 | changereplay->mPassword = mPassword; | 526 | changereplay->mPassword = mPassword; |
528 | setupChangereplay(changereplay); | 527 | setupChangereplay(changereplay); |
529 | 528 | ||
530 | setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MimeMessageMover << new MailPropertyExtractor << new DefaultIndexUpdater<Sink::ApplicationDomain::Mail>); | 529 | setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MimeMessageMover << new MailPropertyExtractor); |
531 | setupPreprocessors(ENTITY_TYPE_FOLDER, QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Folder>); | 530 | setupPreprocessors(ENTITY_TYPE_FOLDER, QVector<Sink::Preprocessor*>()); |
532 | } | 531 | } |
533 | 532 | ||
534 | void ImapResource::removeFromDisk(const QByteArray &instanceIdentifier) | 533 | void ImapResource::removeFromDisk(const QByteArray &instanceIdentifier) |
diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp index b89d78c..920bd28 100644 --- a/examples/maildirresource/maildirresource.cpp +++ b/examples/maildirresource/maildirresource.cpp | |||
@@ -33,7 +33,6 @@ | |||
33 | #include "domain/mail.h" | 33 | #include "domain/mail.h" |
34 | #include "definitions.h" | 34 | #include "definitions.h" |
35 | #include "facadefactory.h" | 35 | #include "facadefactory.h" |
36 | #include "indexupdater.h" | ||
37 | #include "libmaildir/maildir.h" | 36 | #include "libmaildir/maildir.h" |
38 | #include "inspection.h" | 37 | #include "inspection.h" |
39 | #include "synchronizer.h" | 38 | #include "synchronizer.h" |
@@ -85,7 +84,7 @@ class MaildirMimeMessageMover : public Sink::Preprocessor | |||
85 | public: | 84 | public: |
86 | MaildirMimeMessageMover(const QByteArray &resourceInstanceIdentifier, const QString &maildirPath) : mResourceInstanceIdentifier(resourceInstanceIdentifier), mMaildirPath(maildirPath) {} | 85 | MaildirMimeMessageMover(const QByteArray &resourceInstanceIdentifier, const QString &maildirPath) : mResourceInstanceIdentifier(resourceInstanceIdentifier), mMaildirPath(maildirPath) {} |
87 | 86 | ||
88 | QString getPath(const QByteArray &folderIdentifier, Sink::Storage::DataStore::Transaction &transaction) | 87 | QString getPath(const QByteArray &folderIdentifier) |
89 | { | 88 | { |
90 | if (folderIdentifier.isEmpty()) { | 89 | if (folderIdentifier.isEmpty()) { |
91 | return mMaildirPath; | 90 | return mMaildirPath; |
@@ -108,10 +107,10 @@ public: | |||
108 | return folderPath; | 107 | return folderPath; |
109 | } | 108 | } |
110 | 109 | ||
111 | QString moveMessage(const QString &oldPath, const QByteArray &folder, Sink::Storage::DataStore::Transaction &transaction) | 110 | QString moveMessage(const QString &oldPath, const QByteArray &folder) |
112 | { | 111 | { |
113 | if (oldPath.startsWith(Sink::temporaryFileLocation())) { | 112 | if (oldPath.startsWith(Sink::temporaryFileLocation())) { |
114 | const auto path = getPath(folder, transaction); | 113 | const auto path = getPath(folder); |
115 | KPIM::Maildir maildir(path, false); | 114 | KPIM::Maildir maildir(path, false); |
116 | if (!maildir.isValid(true)) { | 115 | if (!maildir.isValid(true)) { |
117 | SinkWarning() << "Maildir is not existing: " << path; | 116 | SinkWarning() << "Maildir is not existing: " << path; |
@@ -120,7 +119,7 @@ public: | |||
120 | return path + "/" + identifier; | 119 | return path + "/" + identifier; |
121 | } else { | 120 | } else { |
122 | //Handle moves | 121 | //Handle moves |
123 | const auto path = getPath(folder, transaction); | 122 | const auto path = getPath(folder); |
124 | KPIM::Maildir maildir(path, false); | 123 | KPIM::Maildir maildir(path, false); |
125 | if (!maildir.isValid(true)) { | 124 | if (!maildir.isValid(true)) { |
126 | SinkWarning() << "Maildir is not existing: " << path; | 125 | SinkWarning() << "Maildir is not existing: " << path; |
@@ -141,16 +140,15 @@ public: | |||
141 | } | 140 | } |
142 | } | 141 | } |
143 | 142 | ||
144 | void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | 143 | void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE |
145 | { | 144 | { |
146 | const auto mimeMessage = newEntity.getProperty("mimeMessage"); | 145 | const auto mimeMessage = newEntity.getProperty("mimeMessage"); |
147 | if (mimeMessage.isValid()) { | 146 | if (mimeMessage.isValid()) { |
148 | newEntity.setProperty("mimeMessage", moveMessage(mimeMessage.toString(), newEntity.getProperty("folder").toByteArray(), transaction)); | 147 | newEntity.setProperty("mimeMessage", moveMessage(mimeMessage.toString(), newEntity.getProperty("folder").toByteArray())); |
149 | } | 148 | } |
150 | } | 149 | } |
151 | 150 | ||
152 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, | 151 | void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE |
153 | Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
154 | { | 152 | { |
155 | const auto mimeMessage = newEntity.getProperty("mimeMessage"); | 153 | const auto mimeMessage = newEntity.getProperty("mimeMessage"); |
156 | const auto newFolder = newEntity.getProperty("folder"); | 154 | const auto newFolder = newEntity.getProperty("folder"); |
@@ -158,7 +156,7 @@ public: | |||
158 | const bool folderChanged = newFolder.isValid() && newFolder.toString() != oldEntity.getProperty("mimeMessage").toString(); | 156 | const bool folderChanged = newFolder.isValid() && newFolder.toString() != oldEntity.getProperty("mimeMessage").toString(); |
159 | if (mimeMessageChanged || folderChanged) { | 157 | if (mimeMessageChanged || folderChanged) { |
160 | SinkTrace() << "Moving mime message: " << mimeMessageChanged << folderChanged; | 158 | SinkTrace() << "Moving mime message: " << mimeMessageChanged << folderChanged; |
161 | auto newPath = moveMessage(mimeMessage.toString(), newEntity.getProperty("folder").toByteArray(), transaction); | 159 | auto newPath = moveMessage(mimeMessage.toString(), newEntity.getProperty("folder").toByteArray()); |
162 | if (newPath != oldEntity.getProperty("mimeMessage").toString()) { | 160 | if (newPath != oldEntity.getProperty("mimeMessage").toString()) { |
163 | const auto oldPath = getFilePathFromMimeMessagePath(oldEntity.getProperty("mimeMessage").toString()); | 161 | const auto oldPath = getFilePathFromMimeMessagePath(oldEntity.getProperty("mimeMessage").toString()); |
164 | newEntity.setProperty("mimeMessage", newPath); | 162 | newEntity.setProperty("mimeMessage", newPath); |
@@ -168,7 +166,7 @@ public: | |||
168 | } | 166 | } |
169 | 167 | ||
170 | auto mimeMessagePath = newEntity.getProperty("mimeMessage").toString(); | 168 | auto mimeMessagePath = newEntity.getProperty("mimeMessage").toString(); |
171 | const auto maildirPath = getPath(newEntity.getProperty("folder").toByteArray(), transaction); | 169 | const auto maildirPath = getPath(newEntity.getProperty("folder").toByteArray()); |
172 | KPIM::Maildir maildir(maildirPath, false); | 170 | KPIM::Maildir maildir(maildirPath, false); |
173 | const auto file = getFilePathFromMimeMessagePath(mimeMessagePath); | 171 | const auto file = getFilePathFromMimeMessagePath(mimeMessagePath); |
174 | QString identifier = KPIM::Maildir::getKeyFromFile(file); | 172 | QString identifier = KPIM::Maildir::getKeyFromFile(file); |
@@ -185,7 +183,7 @@ public: | |||
185 | maildir.changeEntryFlags(identifier, flags); | 183 | maildir.changeEntryFlags(identifier, flags); |
186 | } | 184 | } |
187 | 185 | ||
188 | void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | 186 | void deletedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE |
189 | { | 187 | { |
190 | const auto filePath = getFilePathFromMimeMessagePath(oldEntity.getProperty("mimeMessage").toString()); | 188 | const auto filePath = getFilePathFromMimeMessagePath(oldEntity.getProperty("mimeMessage").toString()); |
191 | QFile::remove(filePath); | 189 | QFile::remove(filePath); |
@@ -199,7 +197,7 @@ class FolderPreprocessor : public Sink::Preprocessor | |||
199 | public: | 197 | public: |
200 | FolderPreprocessor(const QString maildirPath) : mMaildirPath(maildirPath) {} | 198 | FolderPreprocessor(const QString maildirPath) : mMaildirPath(maildirPath) {} |
201 | 199 | ||
202 | void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | 200 | void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE |
203 | { | 201 | { |
204 | auto folderName = newEntity.getProperty("name").toString(); | 202 | auto folderName = newEntity.getProperty("name").toString(); |
205 | const auto path = mMaildirPath + "/" + folderName; | 203 | const auto path = mMaildirPath + "/" + folderName; |
@@ -207,12 +205,11 @@ public: | |||
207 | maildir.create(); | 205 | maildir.create(); |
208 | } | 206 | } |
209 | 207 | ||
210 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, | 208 | void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE |
211 | Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
212 | { | 209 | { |
213 | } | 210 | } |
214 | 211 | ||
215 | void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | 212 | void deletedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE |
216 | { | 213 | { |
217 | } | 214 | } |
218 | QString mMaildirPath; | 215 | QString mMaildirPath; |
@@ -440,8 +437,8 @@ MaildirResource::MaildirResource(const Sink::ResourceContext &resourceContext, c | |||
440 | changereplay->mMaildirPath = mMaildirPath; | 437 | changereplay->mMaildirPath = mMaildirPath; |
441 | setupChangereplay(changereplay); | 438 | setupChangereplay(changereplay); |
442 | 439 | ||
443 | setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MaildirMimeMessageMover(resourceContext.instanceId(), mMaildirPath) << new MaildirMailPropertyExtractor << new DefaultIndexUpdater<Sink::ApplicationDomain::Mail>); | 440 | setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MaildirMimeMessageMover(resourceContext.instanceId(), mMaildirPath) << new MaildirMailPropertyExtractor); |
444 | setupPreprocessors(ENTITY_TYPE_FOLDER, QVector<Sink::Preprocessor*>() << new FolderPreprocessor(mMaildirPath) << new DefaultIndexUpdater<Sink::ApplicationDomain::Folder>); | 441 | setupPreprocessors(ENTITY_TYPE_FOLDER, QVector<Sink::Preprocessor*>() << new FolderPreprocessor(mMaildirPath)); |
445 | 442 | ||
446 | KPIM::Maildir dir(mMaildirPath, true); | 443 | KPIM::Maildir dir(mMaildirPath, true); |
447 | SinkTrace() << "Started maildir resource for maildir: " << mMaildirPath; | 444 | SinkTrace() << "Started maildir resource for maildir: " << mMaildirPath; |
diff --git a/examples/mailtransportresource/mailtransportresource.cpp b/examples/mailtransportresource/mailtransportresource.cpp index 9a22c41..25231c8 100644 --- a/examples/mailtransportresource/mailtransportresource.cpp +++ b/examples/mailtransportresource/mailtransportresource.cpp | |||
@@ -40,7 +40,6 @@ | |||
40 | #include <resourceconfig.h> | 40 | #include <resourceconfig.h> |
41 | #include <pipeline.h> | 41 | #include <pipeline.h> |
42 | #include <mailpreprocessor.h> | 42 | #include <mailpreprocessor.h> |
43 | #include <indexupdater.h> | ||
44 | #include <adaptorfactoryregistry.h> | 43 | #include <adaptorfactoryregistry.h> |
45 | 44 | ||
46 | #define ENTITY_TYPE_MAIL "mail" | 45 | #define ENTITY_TYPE_MAIL "mail" |
@@ -162,7 +161,7 @@ MailtransportResource::MailtransportResource(const Sink::ResourceContext &resour | |||
162 | changereplay->mSettings = mSettings; | 161 | changereplay->mSettings = mSettings; |
163 | setupChangereplay(changereplay); | 162 | setupChangereplay(changereplay); |
164 | 163 | ||
165 | setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new MimeMessageMover << new MailPropertyExtractor << new DefaultIndexUpdater<Sink::ApplicationDomain::Mail>); | 164 | setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new MimeMessageMover << new MailPropertyExtractor); |
166 | } | 165 | } |
167 | 166 | ||
168 | void MailtransportResource::removeFromDisk(const QByteArray &instanceIdentifier) | 167 | void MailtransportResource::removeFromDisk(const QByteArray &instanceIdentifier) |
diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp index c44b9f6..90cc4ba 100644 --- a/tests/mailquerybenchmark.cpp +++ b/tests/mailquerybenchmark.cpp | |||
@@ -32,7 +32,6 @@ | |||
32 | #include <common/store.h> | 32 | #include <common/store.h> |
33 | #include <common/pipeline.h> | 33 | #include <common/pipeline.h> |
34 | #include <common/index.h> | 34 | #include <common/index.h> |
35 | #include <common/indexupdater.h> | ||
36 | #include <common/adaptorfactoryregistry.h> | 35 | #include <common/adaptorfactoryregistry.h> |
37 | 36 | ||
38 | #include "hawd/dataset.h" | 37 | #include "hawd/dataset.h" |
@@ -64,10 +63,6 @@ class MailQueryBenchmark : public QObject | |||
64 | 63 | ||
65 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(Sink::ResourceContext{resourceIdentifier, "test"}); | 64 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(Sink::ResourceContext{resourceIdentifier, "test"}); |
66 | 65 | ||
67 | auto indexer = QSharedPointer<DefaultIndexUpdater<Mail>>::create(); | ||
68 | |||
69 | pipeline->setPreprocessors("mail", QVector<Sink::Preprocessor *>() << indexer.data()); | ||
70 | |||
71 | auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create(); | 66 | auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create(); |
72 | 67 | ||
73 | pipeline->startTransaction(); | 68 | pipeline->startTransaction(); |
diff --git a/tests/pipelinebenchmark.cpp b/tests/pipelinebenchmark.cpp index 16806c7..2e614ef 100644 --- a/tests/pipelinebenchmark.cpp +++ b/tests/pipelinebenchmark.cpp | |||
@@ -32,7 +32,6 @@ | |||
32 | #include <common/store.h> | 32 | #include <common/store.h> |
33 | #include <common/pipeline.h> | 33 | #include <common/pipeline.h> |
34 | #include <common/index.h> | 34 | #include <common/index.h> |
35 | #include <common/indexupdater.h> | ||
36 | #include <common/adaptorfactoryregistry.h> | 35 | #include <common/adaptorfactoryregistry.h> |
37 | 36 | ||
38 | #include "hawd/dataset.h" | 37 | #include "hawd/dataset.h" |
@@ -45,27 +44,6 @@ | |||
45 | #include "createentity_generated.h" | 44 | #include "createentity_generated.h" |
46 | #include "getrssusage.h" | 45 | #include "getrssusage.h" |
47 | 46 | ||
48 | // class IndexUpdater : public Sink::Preprocessor { | ||
49 | // public: | ||
50 | // void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
51 | // { | ||
52 | // for (int i = 0; i < 10; i++) { | ||
53 | // Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction); | ||
54 | // ridIndex.add("foo", uid); | ||
55 | // } | ||
56 | // } | ||
57 | // | ||
58 | // void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, | ||
59 | // Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
60 | // { | ||
61 | // } | ||
62 | // | ||
63 | // void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
64 | // { | ||
65 | // } | ||
66 | // }; | ||
67 | // | ||
68 | |||
69 | /** | 47 | /** |
70 | * Benchmark pipeline processing speed. | 48 | * Benchmark pipeline processing speed. |
71 | * | 49 | * |
@@ -133,15 +111,9 @@ private slots: | |||
133 | resourceIdentifier = "sink.test.instance1"; | 111 | resourceIdentifier = "sink.test.instance1"; |
134 | } | 112 | } |
135 | 113 | ||
136 | void testWithoutIndex() | ||
137 | { | ||
138 | populateDatabase(10000, QVector<Sink::Preprocessor *>()); | ||
139 | } | ||
140 | |||
141 | void testWithIndex() | 114 | void testWithIndex() |
142 | { | 115 | { |
143 | auto indexer = QSharedPointer<DefaultIndexUpdater<Sink::ApplicationDomain::Mail>>::create(); | 116 | populateDatabase(10000, QVector<Sink::Preprocessor *>()); |
144 | populateDatabase(10000, QVector<Sink::Preprocessor *>() << indexer.data()); | ||
145 | } | 117 | } |
146 | }; | 118 | }; |
147 | 119 | ||
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 112453e..4e04152 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp | |||
@@ -152,23 +152,22 @@ QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision) | |||
152 | class TestProcessor : public Sink::Preprocessor | 152 | class TestProcessor : public Sink::Preprocessor |
153 | { | 153 | { |
154 | public: | 154 | public: |
155 | void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | 155 | void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE |
156 | { | 156 | { |
157 | newUids << uid; | 157 | newUids << newEntity.identifier(); |
158 | newRevisions << revision; | 158 | newRevisions << newEntity.revision(); |
159 | } | 159 | } |
160 | 160 | ||
161 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, | 161 | void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE |
162 | Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
163 | { | 162 | { |
164 | modifiedUids << uid; | 163 | modifiedUids << newEntity.identifier(); |
165 | modifiedRevisions << revision; | 164 | modifiedRevisions << newEntity.revision(); |
166 | } | 165 | } |
167 | 166 | ||
168 | void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | 167 | void deletedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE |
169 | { | 168 | { |
170 | deletedUids << uid; | 169 | deletedUids << oldEntity.identifier(); |
171 | deletedRevisions << revision; | 170 | deletedRevisions << oldEntity.revision(); |
172 | deletedSummaries << oldEntity.getProperty("summary").toByteArray(); | 171 | deletedSummaries << oldEntity.getProperty("summary").toByteArray(); |
173 | } | 172 | } |
174 | 173 | ||
@@ -187,6 +186,17 @@ public: | |||
187 | class PipelineTest : public QObject | 186 | class PipelineTest : public QObject |
188 | { | 187 | { |
189 | Q_OBJECT | 188 | Q_OBJECT |
189 | |||
190 | QByteArray instanceIdentifier() | ||
191 | { | ||
192 | return "pipelinetest.instance1"; | ||
193 | } | ||
194 | |||
195 | Sink::ResourceContext getContext() | ||
196 | { | ||
197 | return Sink::ResourceContext{instanceIdentifier(), "test", Sink::AdaptorFactoryRegistry::instance().getFactories("test")}; | ||
198 | } | ||
199 | |||
190 | private slots: | 200 | private slots: |
191 | void initTestCase() | 201 | void initTestCase() |
192 | { | 202 | { |
@@ -195,7 +205,7 @@ private slots: | |||
195 | 205 | ||
196 | void init() | 206 | void init() |
197 | { | 207 | { |
198 | removeFromDisk("sink.pipelinetest.instance1"); | 208 | removeFromDisk(instanceIdentifier()); |
199 | } | 209 | } |
200 | 210 | ||
201 | void testCreate() | 211 | void testCreate() |
@@ -203,15 +213,22 @@ private slots: | |||
203 | flatbuffers::FlatBufferBuilder entityFbb; | 213 | flatbuffers::FlatBufferBuilder entityFbb; |
204 | auto command = createEntityCommand(createEvent(entityFbb)); | 214 | auto command = createEntityCommand(createEvent(entityFbb)); |
205 | 215 | ||
206 | Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); | 216 | Sink::Pipeline pipeline(getContext()); |
207 | 217 | ||
208 | pipeline.startTransaction(); | 218 | pipeline.startTransaction(); |
209 | pipeline.newEntity(command.constData(), command.size()); | 219 | pipeline.newEntity(command.constData(), command.size()); |
210 | pipeline.commit(); | 220 | pipeline.commit(); |
211 | 221 | ||
212 | auto result = getKeys("sink.pipelinetest.instance1", "event.main"); | 222 | auto result = getKeys(instanceIdentifier(), "event.main"); |
213 | qDebug() << result; | 223 | qDebug() << result; |
214 | QCOMPARE(result.size(), 1); | 224 | QCOMPARE(result.size(), 1); |
225 | |||
226 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | ||
227 | auto buffer = getEntity(instanceIdentifier(), "event.main", result.first()); | ||
228 | QVERIFY(!buffer.isEmpty()); | ||
229 | Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); | ||
230 | auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); | ||
231 | QVERIFY2(adaptor->getProperty("summary").toString() == QString("summary"), "The modification isn't applied."); | ||
215 | } | 232 | } |
216 | 233 | ||
217 | void testModify() | 234 | void testModify() |
@@ -219,7 +236,7 @@ private slots: | |||
219 | flatbuffers::FlatBufferBuilder entityFbb; | 236 | flatbuffers::FlatBufferBuilder entityFbb; |
220 | auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); | 237 | auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); |
221 | 238 | ||
222 | Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); | 239 | Sink::Pipeline pipeline(getContext()); |
223 | 240 | ||
224 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | 241 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); |
225 | 242 | ||
@@ -229,7 +246,7 @@ private slots: | |||
229 | pipeline.commit(); | 246 | pipeline.commit(); |
230 | 247 | ||
231 | // Get uid of written entity | 248 | // Get uid of written entity |
232 | auto keys = getKeys("sink.pipelinetest.instance1", "event.main"); | 249 | auto keys = getKeys(instanceIdentifier(), "event.main"); |
233 | QCOMPARE(keys.size(), 1); | 250 | QCOMPARE(keys.size(), 1); |
234 | const auto key = keys.first(); | 251 | const auto key = keys.first(); |
235 | const auto uid = Sink::Storage::DataStore::uidFromKey(key); | 252 | const auto uid = Sink::Storage::DataStore::uidFromKey(key); |
@@ -242,7 +259,7 @@ private slots: | |||
242 | pipeline.commit(); | 259 | pipeline.commit(); |
243 | 260 | ||
244 | // Ensure we've got the new revision with the modification | 261 | // Ensure we've got the new revision with the modification |
245 | auto buffer = getEntity("sink.pipelinetest.instance1", "event.main", Sink::Storage::DataStore::assembleKey(uid, 2)); | 262 | auto buffer = getEntity(instanceIdentifier(), "event.main", Sink::Storage::DataStore::assembleKey(uid, 2)); |
246 | QVERIFY(!buffer.isEmpty()); | 263 | QVERIFY(!buffer.isEmpty()); |
247 | Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); | 264 | Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); |
248 | auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); | 265 | auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); |
@@ -251,7 +268,7 @@ private slots: | |||
251 | QVERIFY2(adaptor->getProperty("description").toString() == QString("description"), "The modification has sideeffects."); | 268 | QVERIFY2(adaptor->getProperty("description").toString() == QString("description"), "The modification has sideeffects."); |
252 | 269 | ||
253 | // Both revisions are in the store at this point | 270 | // Both revisions are in the store at this point |
254 | QCOMPARE(getKeys("sink.pipelinetest.instance1", "event.main").size(), 2); | 271 | QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); |
255 | 272 | ||
256 | // Cleanup old revisions | 273 | // Cleanup old revisions |
257 | pipeline.startTransaction(); | 274 | pipeline.startTransaction(); |
@@ -259,7 +276,7 @@ private slots: | |||
259 | pipeline.commit(); | 276 | pipeline.commit(); |
260 | 277 | ||
261 | // And now only the latest revision is left | 278 | // And now only the latest revision is left |
262 | QCOMPARE(getKeys("sink.pipelinetest.instance1", "event.main").size(), 1); | 279 | QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 1); |
263 | } | 280 | } |
264 | 281 | ||
265 | void testModifyWithUnrelatedOperationInbetween() | 282 | void testModifyWithUnrelatedOperationInbetween() |
@@ -267,7 +284,7 @@ private slots: | |||
267 | flatbuffers::FlatBufferBuilder entityFbb; | 284 | flatbuffers::FlatBufferBuilder entityFbb; |
268 | auto command = createEntityCommand(createEvent(entityFbb)); | 285 | auto command = createEntityCommand(createEvent(entityFbb)); |
269 | 286 | ||
270 | Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); | 287 | Sink::Pipeline pipeline(getContext()); |
271 | 288 | ||
272 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | 289 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); |
273 | 290 | ||
@@ -277,7 +294,7 @@ private slots: | |||
277 | pipeline.commit(); | 294 | pipeline.commit(); |
278 | 295 | ||
279 | // Get uid of written entity | 296 | // Get uid of written entity |
280 | auto keys = getKeys("sink.pipelinetest.instance1", "event.main"); | 297 | auto keys = getKeys(instanceIdentifier(), "event.main"); |
281 | QCOMPARE(keys.size(), 1); | 298 | QCOMPARE(keys.size(), 1); |
282 | const auto uid = Sink::Storage::DataStore::uidFromKey(keys.first()); | 299 | const auto uid = Sink::Storage::DataStore::uidFromKey(keys.first()); |
283 | 300 | ||
@@ -299,7 +316,7 @@ private slots: | |||
299 | pipeline.commit(); | 316 | pipeline.commit(); |
300 | 317 | ||
301 | // Ensure we've got the new revision with the modification | 318 | // Ensure we've got the new revision with the modification |
302 | auto buffer = getEntity("sink.pipelinetest.instance1", "event.main", Sink::Storage::DataStore::assembleKey(uid, 3)); | 319 | auto buffer = getEntity(instanceIdentifier(), "event.main", Sink::Storage::DataStore::assembleKey(uid, 3)); |
303 | QVERIFY(!buffer.isEmpty()); | 320 | QVERIFY(!buffer.isEmpty()); |
304 | Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); | 321 | Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); |
305 | auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); | 322 | auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); |
@@ -310,14 +327,14 @@ private slots: | |||
310 | { | 327 | { |
311 | flatbuffers::FlatBufferBuilder entityFbb; | 328 | flatbuffers::FlatBufferBuilder entityFbb; |
312 | auto command = createEntityCommand(createEvent(entityFbb)); | 329 | auto command = createEntityCommand(createEvent(entityFbb)); |
313 | Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); | 330 | Sink::Pipeline pipeline(getContext()); |
314 | 331 | ||
315 | // Create the initial revision | 332 | // Create the initial revision |
316 | pipeline.startTransaction(); | 333 | pipeline.startTransaction(); |
317 | pipeline.newEntity(command.constData(), command.size()); | 334 | pipeline.newEntity(command.constData(), command.size()); |
318 | pipeline.commit(); | 335 | pipeline.commit(); |
319 | 336 | ||
320 | auto result = getKeys("sink.pipelinetest.instance1", "event.main"); | 337 | auto result = getKeys(instanceIdentifier(), "event.main"); |
321 | QCOMPARE(result.size(), 1); | 338 | QCOMPARE(result.size(), 1); |
322 | 339 | ||
323 | const auto uid = Sink::Storage::DataStore::uidFromKey(result.first()); | 340 | const auto uid = Sink::Storage::DataStore::uidFromKey(result.first()); |
@@ -329,7 +346,7 @@ private slots: | |||
329 | pipeline.commit(); | 346 | pipeline.commit(); |
330 | 347 | ||
331 | // We have a new revision that indicates the deletion | 348 | // We have a new revision that indicates the deletion |
332 | QCOMPARE(getKeys("sink.pipelinetest.instance1", "event.main").size(), 2); | 349 | QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); |
333 | 350 | ||
334 | // Cleanup old revisions | 351 | // Cleanup old revisions |
335 | pipeline.startTransaction(); | 352 | pipeline.startTransaction(); |
@@ -337,7 +354,7 @@ private slots: | |||
337 | pipeline.commit(); | 354 | pipeline.commit(); |
338 | 355 | ||
339 | // And all revisions are gone | 356 | // And all revisions are gone |
340 | QCOMPARE(getKeys("sink.pipelinetest.instance1", "event.main").size(), 0); | 357 | QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 0); |
341 | } | 358 | } |
342 | 359 | ||
343 | void testPreprocessor() | 360 | void testPreprocessor() |
@@ -346,7 +363,7 @@ private slots: | |||
346 | 363 | ||
347 | auto testProcessor = new TestProcessor; | 364 | auto testProcessor = new TestProcessor; |
348 | 365 | ||
349 | Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); | 366 | Sink::Pipeline pipeline(getContext()); |
350 | pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << testProcessor); | 367 | pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << testProcessor); |
351 | pipeline.startTransaction(); | 368 | pipeline.startTransaction(); |
352 | // pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); | 369 | // pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); |
@@ -363,7 +380,7 @@ private slots: | |||
363 | pipeline.commit(); | 380 | pipeline.commit(); |
364 | entityFbb.Clear(); | 381 | entityFbb.Clear(); |
365 | pipeline.startTransaction(); | 382 | pipeline.startTransaction(); |
366 | auto keys = getKeys("sink.pipelinetest.instance1", "event.main"); | 383 | auto keys = getKeys(instanceIdentifier(), "event.main"); |
367 | QCOMPARE(keys.size(), 1); | 384 | QCOMPARE(keys.size(), 1); |
368 | const auto uid = Sink::Storage::DataStore::uidFromKey(keys.first()); | 385 | const auto uid = Sink::Storage::DataStore::uidFromKey(keys.first()); |
369 | { | 386 | { |