diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/domain/event.cpp | 21 | ||||
-rw-r--r-- | common/domain/event.h | 2 | ||||
-rw-r--r-- | common/domain/folder.cpp | 22 | ||||
-rw-r--r-- | common/domain/folder.h | 2 | ||||
-rw-r--r-- | common/domain/mail.cpp | 13 | ||||
-rw-r--r-- | common/domain/mail.h | 2 | ||||
-rw-r--r-- | common/domainadaptor.h | 16 | ||||
-rw-r--r-- | common/genericresource.cpp | 6 | ||||
-rw-r--r-- | common/indexupdater.h | 91 | ||||
-rw-r--r-- | common/mailpreprocessor.cpp | 10 | ||||
-rw-r--r-- | common/mailpreprocessor.h | 10 | ||||
-rw-r--r-- | common/pipeline.cpp | 275 | ||||
-rw-r--r-- | common/pipeline.h | 44 | ||||
-rw-r--r-- | common/specialpurposepreprocessor.cpp | 18 | ||||
-rw-r--r-- | common/specialpurposepreprocessor.h | 8 | ||||
-rw-r--r-- | common/storage/entitystore.cpp | 185 | ||||
-rw-r--r-- | common/storage/entitystore.h | 11 | ||||
-rw-r--r-- | common/typeindex.cpp | 16 | ||||
-rw-r--r-- | common/typeindex.h | 5 |
19 files changed, 312 insertions, 445 deletions
diff --git a/common/domain/event.cpp b/common/domain/event.cpp index d50652d..41ec625 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp | |||
@@ -47,27 +47,6 @@ void TypeImplementation<Event>::configureIndex(TypeIndex &index) | |||
47 | index.addProperty<QByteArray>(Event::Uid::name); | 47 | index.addProperty<QByteArray>(Event::Uid::name); |
48 | } | 48 | } |
49 | 49 | ||
50 | static TypeIndex &getIndex() | ||
51 | { | ||
52 | QMutexLocker locker(&sMutex); | ||
53 | static TypeIndex *index = 0; | ||
54 | if (!index) { | ||
55 | index = new TypeIndex("event"); | ||
56 | TypeImplementation<Event>::configureIndex(*index); | ||
57 | } | ||
58 | return *index; | ||
59 | } | ||
60 | |||
61 | void TypeImplementation<Event>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | ||
62 | { | ||
63 | return getIndex().add(identifier, bufferAdaptor, transaction); | ||
64 | } | ||
65 | |||
66 | void TypeImplementation<Event>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | ||
67 | { | ||
68 | return getIndex().remove(identifier, bufferAdaptor, transaction); | ||
69 | } | ||
70 | |||
71 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Event>::Buffer> > TypeImplementation<Event>::initializeReadPropertyMapper() | 50 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Event>::Buffer> > TypeImplementation<Event>::initializeReadPropertyMapper() |
72 | { | 51 | { |
73 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); | 52 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); |
diff --git a/common/domain/event.h b/common/domain/event.h index 18e0f20..fbaf4ed 100644 --- a/common/domain/event.h +++ b/common/domain/event.h | |||
@@ -56,8 +56,6 @@ public: | |||
56 | typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; | 56 | typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; |
57 | static void configureIndex(TypeIndex &index); | 57 | static void configureIndex(TypeIndex &index); |
58 | static QSet<QByteArray> indexedProperties(); | 58 | static QSet<QByteArray> indexedProperties(); |
59 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | ||
60 | static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | ||
61 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); | 59 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); |
62 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); | 60 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); |
63 | }; | 61 | }; |
diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp index 94727a3..058035a 100644 --- a/common/domain/folder.cpp +++ b/common/domain/folder.cpp | |||
@@ -50,28 +50,6 @@ void TypeImplementation<Folder>::configureIndex(TypeIndex &index) | |||
50 | index.addProperty<QString>(Folder::Name::name); | 50 | index.addProperty<QString>(Folder::Name::name); |
51 | } | 51 | } |
52 | 52 | ||
53 | static TypeIndex &getIndex() | ||
54 | { | ||
55 | QMutexLocker locker(&sMutex); | ||
56 | static TypeIndex *index = 0; | ||
57 | if (!index) { | ||
58 | index = new TypeIndex("folder"); | ||
59 | TypeImplementation<Folder>::configureIndex(*index); | ||
60 | } | ||
61 | return *index; | ||
62 | } | ||
63 | |||
64 | void TypeImplementation<Folder>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | ||
65 | { | ||
66 | SinkTrace() << "Indexing " << identifier; | ||
67 | getIndex().add(identifier, bufferAdaptor, transaction); | ||
68 | } | ||
69 | |||
70 | void TypeImplementation<Folder>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | ||
71 | { | ||
72 | getIndex().remove(identifier, bufferAdaptor, transaction); | ||
73 | } | ||
74 | |||
75 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Folder>::Buffer> > TypeImplementation<Folder>::initializeReadPropertyMapper() | 53 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Folder>::Buffer> > TypeImplementation<Folder>::initializeReadPropertyMapper() |
76 | { | 54 | { |
77 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); | 55 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); |
diff --git a/common/domain/folder.h b/common/domain/folder.h index ea0d79a..47a544b 100644 --- a/common/domain/folder.h +++ b/common/domain/folder.h | |||
@@ -50,8 +50,6 @@ public: | |||
50 | typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; | 50 | typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; |
51 | static void configureIndex(TypeIndex &index); | 51 | static void configureIndex(TypeIndex &index); |
52 | static QSet<QByteArray> indexedProperties(); | 52 | static QSet<QByteArray> indexedProperties(); |
53 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | ||
54 | static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | ||
55 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); | 53 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); |
56 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); | 54 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); |
57 | }; | 55 | }; |
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index b0a3aae..9d58767 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp | |||
@@ -170,19 +170,6 @@ static void updateThreadingIndex(const QByteArray &identifier, const BufferAdapt | |||
170 | } | 170 | } |
171 | } | 171 | } |
172 | 172 | ||
173 | void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | ||
174 | { | ||
175 | SinkTrace() << "Indexing " << identifier; | ||
176 | getIndex().add(identifier, bufferAdaptor, transaction); | ||
177 | updateThreadingIndex(identifier, bufferAdaptor, transaction); | ||
178 | } | ||
179 | |||
180 | void TypeImplementation<Mail>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | ||
181 | { | ||
182 | getIndex().remove(identifier, bufferAdaptor, transaction); | ||
183 | //TODO cleanup threading index | ||
184 | } | ||
185 | |||
186 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Mail>::Buffer> > TypeImplementation<Mail>::initializeReadPropertyMapper() | 173 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Mail>::Buffer> > TypeImplementation<Mail>::initializeReadPropertyMapper() |
187 | { | 174 | { |
188 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); | 175 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); |
diff --git a/common/domain/mail.h b/common/domain/mail.h index 81a0d1c..c0cfc55 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h | |||
@@ -50,8 +50,6 @@ public: | |||
50 | typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; | 50 | typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; |
51 | static void configureIndex(TypeIndex &index); | 51 | static void configureIndex(TypeIndex &index); |
52 | static QSet<QByteArray> indexedProperties(); | 52 | static QSet<QByteArray> indexedProperties(); |
53 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | ||
54 | static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | ||
55 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); | 53 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); |
56 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); | 54 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); |
57 | }; | 55 | }; |
diff --git a/common/domainadaptor.h b/common/domainadaptor.h index 6a9d755..195d5ec 100644 --- a/common/domainadaptor.h +++ b/common/domainadaptor.h | |||
@@ -87,11 +87,11 @@ static void createBufferPartBuffer(const Sink::ApplicationDomain::ApplicationDom | |||
87 | * A generic adaptor implementation that uses a property mapper to read/write values. | 87 | * A generic adaptor implementation that uses a property mapper to read/write values. |
88 | */ | 88 | */ |
89 | template <class LocalBuffer, class ResourceBuffer> | 89 | template <class LocalBuffer, class ResourceBuffer> |
90 | class GenericBufferAdaptor : public Sink::ApplicationDomain::BufferAdaptor | 90 | class DatastoreBufferAdaptor : public Sink::ApplicationDomain::BufferAdaptor |
91 | { | 91 | { |
92 | SINK_DEBUG_AREA("bufferadaptor") | 92 | SINK_DEBUG_AREA("bufferadaptor") |
93 | public: | 93 | public: |
94 | GenericBufferAdaptor() : BufferAdaptor() | 94 | DatastoreBufferAdaptor() : BufferAdaptor() |
95 | { | 95 | { |
96 | } | 96 | } |
97 | 97 | ||
@@ -148,18 +148,14 @@ public: | |||
148 | /** | 148 | /** |
149 | * Creates an adaptor for the given domain and resource types. | 149 | * Creates an adaptor for the given domain and resource types. |
150 | * | 150 | * |
151 | * This returns by default a GenericBufferAdaptor initialized with the corresponding property mappers. | 151 | * This returns by default a DatastoreBufferAdaptor initialized with the corresponding property mappers. |
152 | */ | 152 | */ |
153 | virtual QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> createAdaptor(const Sink::Entity &entity) Q_DECL_OVERRIDE | 153 | virtual QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> createAdaptor(const Sink::Entity &entity) Q_DECL_OVERRIDE |
154 | { | 154 | { |
155 | const auto resourceBuffer = Sink::EntityBuffer::readBuffer<ResourceBuffer>(entity.resource()); | 155 | auto adaptor = QSharedPointer<DatastoreBufferAdaptor<LocalBuffer, ResourceBuffer>>::create(); |
156 | const auto localBuffer = Sink::EntityBuffer::readBuffer<LocalBuffer>(entity.local()); | 156 | adaptor->mLocalBuffer = Sink::EntityBuffer::readBuffer<LocalBuffer>(entity.local()); |
157 | // const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
158 | |||
159 | auto adaptor = QSharedPointer<GenericBufferAdaptor<LocalBuffer, ResourceBuffer>>::create(); | ||
160 | adaptor->mLocalBuffer = localBuffer; | ||
161 | adaptor->mLocalMapper = mLocalMapper; | 157 | adaptor->mLocalMapper = mLocalMapper; |
162 | adaptor->mResourceBuffer = resourceBuffer; | 158 | adaptor->mResourceBuffer = Sink::EntityBuffer::readBuffer<ResourceBuffer>(entity.resource()); |
163 | adaptor->mResourceMapper = mResourceMapper; | 159 | adaptor->mResourceMapper = mResourceMapper; |
164 | return adaptor; | 160 | return adaptor; |
165 | } | 161 | } |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index e0d395a..8f08f3d 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -59,9 +59,9 @@ class CommandProcessor : public QObject | |||
59 | public: | 59 | public: |
60 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) | 60 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) |
61 | { | 61 | { |
62 | mLowerBoundRevision = DataStore::maxRevision(mPipeline->storage().createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { | 62 | mPipeline->startTransaction(); |
63 | SinkWarning() << error.message; | 63 | mLowerBoundRevision = mPipeline->revision(); |
64 | })); | 64 | mPipeline->commit(); |
65 | 65 | ||
66 | for (auto queue : mCommandQueues) { | 66 | for (auto queue : mCommandQueues) { |
67 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); | 67 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); |
diff --git a/common/indexupdater.h b/common/indexupdater.h deleted file mode 100644 index 221a4ed..0000000 --- a/common/indexupdater.h +++ /dev/null | |||
@@ -1,91 +0,0 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | #pragma once | ||
20 | |||
21 | #include <pipeline.h> | ||
22 | #include <index.h> | ||
23 | |||
24 | class IndexUpdater : public Sink::Preprocessor | ||
25 | { | ||
26 | public: | ||
27 | IndexUpdater(const QByteArray &index, const QByteArray &type, const QByteArray &property) : mIndexIdentifier(index), mBufferType(type), mProperty(property) | ||
28 | { | ||
29 | } | ||
30 | |||
31 | void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
32 | { | ||
33 | add(newEntity.getProperty(mProperty), uid, transaction); | ||
34 | } | ||
35 | |||
36 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, | ||
37 | Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
38 | { | ||
39 | remove(oldEntity.getProperty(mProperty), uid, transaction); | ||
40 | add(newEntity.getProperty(mProperty), uid, transaction); | ||
41 | } | ||
42 | |||
43 | void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
44 | { | ||
45 | remove(oldEntity.getProperty(mProperty), uid, transaction); | ||
46 | } | ||
47 | |||
48 | private: | ||
49 | void add(const QVariant &value, const QByteArray &uid, Sink::Storage::DataStore::Transaction &transaction) | ||
50 | { | ||
51 | if (value.isValid()) { | ||
52 | Index(mIndexIdentifier, transaction).add(value.toByteArray(), uid); | ||
53 | } | ||
54 | } | ||
55 | |||
56 | void remove(const QVariant &value, const QByteArray &uid, Sink::Storage::DataStore::Transaction &transaction) | ||
57 | { | ||
58 | if (value.isValid()) { | ||
59 | const auto data = value.toByteArray(); | ||
60 | if (!data.isEmpty()) { | ||
61 | Index(mIndexIdentifier, transaction).remove(data, uid); | ||
62 | } | ||
63 | } | ||
64 | } | ||
65 | |||
66 | QByteArray mIndexIdentifier; | ||
67 | QByteArray mBufferType; | ||
68 | QByteArray mProperty; | ||
69 | }; | ||
70 | |||
71 | template <typename DomainType> | ||
72 | class DefaultIndexUpdater : public Sink::Preprocessor | ||
73 | { | ||
74 | public: | ||
75 | void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
76 | { | ||
77 | Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction); | ||
78 | } | ||
79 | |||
80 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, | ||
81 | Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
82 | { | ||
83 | Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction); | ||
84 | Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction); | ||
85 | } | ||
86 | |||
87 | void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
88 | { | ||
89 | Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction); | ||
90 | } | ||
91 | }; | ||
diff --git a/common/mailpreprocessor.cpp b/common/mailpreprocessor.cpp index b978323..d45afe6 100644 --- a/common/mailpreprocessor.cpp +++ b/common/mailpreprocessor.cpp | |||
@@ -116,7 +116,7 @@ static void updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail, KMime: | |||
116 | } | 116 | } |
117 | } | 117 | } |
118 | 118 | ||
119 | void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) | 119 | void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail) |
120 | { | 120 | { |
121 | MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath())); | 121 | MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath())); |
122 | auto msg = mimeMessageReader.mimeMessage(); | 122 | auto msg = mimeMessageReader.mimeMessage(); |
@@ -125,7 +125,7 @@ void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink: | |||
125 | } | 125 | } |
126 | } | 126 | } |
127 | 127 | ||
128 | void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::DataStore::Transaction &transaction) | 128 | void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) |
129 | { | 129 | { |
130 | MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath())); | 130 | MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath())); |
131 | auto msg = mimeMessageReader.mimeMessage(); | 131 | auto msg = mimeMessageReader.mimeMessage(); |
@@ -161,21 +161,21 @@ QString MimeMessageMover::moveMessage(const QString &oldPath, const Sink::Applic | |||
161 | return oldPath; | 161 | return oldPath; |
162 | } | 162 | } |
163 | 163 | ||
164 | void MimeMessageMover::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) | 164 | void MimeMessageMover::newEntity(Sink::ApplicationDomain::Mail &mail) |
165 | { | 165 | { |
166 | if (!mail.getMimeMessagePath().isEmpty()) { | 166 | if (!mail.getMimeMessagePath().isEmpty()) { |
167 | mail.setMimeMessagePath(moveMessage(mail.getMimeMessagePath(), mail)); | 167 | mail.setMimeMessagePath(moveMessage(mail.getMimeMessagePath(), mail)); |
168 | } | 168 | } |
169 | } | 169 | } |
170 | 170 | ||
171 | void MimeMessageMover::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::DataStore::Transaction &transaction) | 171 | void MimeMessageMover::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) |
172 | { | 172 | { |
173 | if (!newMail.getMimeMessagePath().isEmpty()) { | 173 | if (!newMail.getMimeMessagePath().isEmpty()) { |
174 | newMail.setMimeMessagePath(moveMessage(newMail.getMimeMessagePath(), newMail)); | 174 | newMail.setMimeMessagePath(moveMessage(newMail.getMimeMessagePath(), newMail)); |
175 | } | 175 | } |
176 | } | 176 | } |
177 | 177 | ||
178 | void MimeMessageMover::deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) | 178 | void MimeMessageMover::deletedEntity(const Sink::ApplicationDomain::Mail &mail) |
179 | { | 179 | { |
180 | QFile::remove(mail.getMimeMessagePath()); | 180 | QFile::remove(mail.getMimeMessagePath()); |
181 | } | 181 | } |
diff --git a/common/mailpreprocessor.h b/common/mailpreprocessor.h index c66517e..f979f22 100644 --- a/common/mailpreprocessor.h +++ b/common/mailpreprocessor.h | |||
@@ -24,8 +24,8 @@ class SINK_EXPORT MailPropertyExtractor : public Sink::EntityPreprocessor<Sink:: | |||
24 | { | 24 | { |
25 | public: | 25 | public: |
26 | virtual ~MailPropertyExtractor(){} | 26 | virtual ~MailPropertyExtractor(){} |
27 | virtual void newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 27 | virtual void newEntity(Sink::ApplicationDomain::Mail &mail) Q_DECL_OVERRIDE; |
28 | virtual void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 28 | virtual void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) Q_DECL_OVERRIDE; |
29 | protected: | 29 | protected: |
30 | virtual QString getFilePathFromMimeMessagePath(const QString &) const; | 30 | virtual QString getFilePathFromMimeMessagePath(const QString &) const; |
31 | }; | 31 | }; |
@@ -36,9 +36,9 @@ public: | |||
36 | MimeMessageMover(); | 36 | MimeMessageMover(); |
37 | virtual ~MimeMessageMover(){} | 37 | virtual ~MimeMessageMover(){} |
38 | 38 | ||
39 | void newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 39 | void newEntity(Sink::ApplicationDomain::Mail &mail) Q_DECL_OVERRIDE; |
40 | void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 40 | void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) Q_DECL_OVERRIDE; |
41 | void deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 41 | void deletedEntity(const Sink::ApplicationDomain::Mail &mail) Q_DECL_OVERRIDE; |
42 | 42 | ||
43 | private: | 43 | private: |
44 | QString moveMessage(const QString &oldPath, const Sink::ApplicationDomain::Mail &mail); | 44 | QString moveMessage(const QString &oldPath, const Sink::ApplicationDomain::Mail &mail); |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index e257857..ea59ae9 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -37,6 +37,7 @@ | |||
37 | #include "adaptorfactoryregistry.h" | 37 | #include "adaptorfactoryregistry.h" |
38 | #include "definitions.h" | 38 | #include "definitions.h" |
39 | #include "bufferutils.h" | 39 | #include "bufferutils.h" |
40 | #include "storage/entitystore.h" | ||
40 | 41 | ||
41 | SINK_DEBUG_AREA("pipeline") | 42 | SINK_DEBUG_AREA("pipeline") |
42 | 43 | ||
@@ -46,31 +47,18 @@ using namespace Sink::Storage; | |||
46 | class Pipeline::Private | 47 | class Pipeline::Private |
47 | { | 48 | { |
48 | public: | 49 | public: |
49 | Private(const ResourceContext &context) : resourceContext(context), storage(Sink::storageLocation(), context.instanceId(), DataStore::ReadWrite), revisionChanged(false) | 50 | Private(const ResourceContext &context) : resourceContext(context), entityStore(context), revisionChanged(false) |
50 | { | 51 | { |
51 | } | 52 | } |
52 | 53 | ||
53 | ResourceContext resourceContext; | 54 | ResourceContext resourceContext; |
54 | DataStore storage; | 55 | Storage::EntityStore entityStore; |
55 | DataStore::Transaction transaction; | ||
56 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; | 56 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; |
57 | bool revisionChanged; | 57 | bool revisionChanged; |
58 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | ||
59 | QTime transactionTime; | 58 | QTime transactionTime; |
60 | int transactionItemCount; | 59 | int transactionItemCount; |
61 | }; | 60 | }; |
62 | 61 | ||
63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | ||
64 | { | ||
65 | SinkTrace() << "Committing new revision: " << uid << newRevision; | ||
66 | DataStore::mainDatabase(transaction, bufferType) | ||
67 | .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | ||
68 | [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); | ||
69 | revisionChanged = true; | ||
70 | DataStore::setMaxRevision(transaction, newRevision); | ||
71 | DataStore::recordRevision(transaction, newRevision, uid, bufferType); | ||
72 | } | ||
73 | |||
74 | 62 | ||
75 | Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) | 63 | Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) |
76 | { | 64 | { |
@@ -78,7 +66,6 @@ Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Pri | |||
78 | 66 | ||
79 | Pipeline::~Pipeline() | 67 | Pipeline::~Pipeline() |
80 | { | 68 | { |
81 | d->transaction = DataStore::Transaction(); | ||
82 | } | 69 | } |
83 | 70 | ||
84 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) | 71 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) |
@@ -98,27 +85,10 @@ void Pipeline::startTransaction() | |||
98 | // for (auto processor : d->processors[bufferType]) { | 85 | // for (auto processor : d->processors[bufferType]) { |
99 | // processor->startBatch(); | 86 | // processor->startBatch(); |
100 | // } | 87 | // } |
101 | if (d->transaction) { | ||
102 | return; | ||
103 | } | ||
104 | SinkTrace() << "Starting transaction."; | 88 | SinkTrace() << "Starting transaction."; |
105 | d->transactionTime.start(); | 89 | d->transactionTime.start(); |
106 | d->transactionItemCount = 0; | 90 | d->transactionItemCount = 0; |
107 | d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { | 91 | d->entityStore.startTransaction(DataStore::ReadWrite); |
108 | SinkWarning() << error.message; | ||
109 | }); | ||
110 | |||
111 | //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. | ||
112 | //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). | ||
113 | //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync... | ||
114 | if (d->storage.exists()) { | ||
115 | while (!d->transaction.validateNamedDatabases()) { | ||
116 | SinkWarning() << "Opened an invalid transaction!!!!!!"; | ||
117 | d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { | ||
118 | SinkWarning() << error.message; | ||
119 | }); | ||
120 | } | ||
121 | } | ||
122 | } | 92 | } |
123 | 93 | ||
124 | void Pipeline::commit() | 94 | void Pipeline::commit() |
@@ -129,34 +99,20 @@ void Pipeline::commit() | |||
129 | // processor->finalize(); | 99 | // processor->finalize(); |
130 | // } | 100 | // } |
131 | if (!d->revisionChanged) { | 101 | if (!d->revisionChanged) { |
132 | d->transaction.abort(); | 102 | d->entityStore.abortTransaction(); |
133 | d->transaction = DataStore::Transaction(); | ||
134 | return; | 103 | return; |
135 | } | 104 | } |
136 | const auto revision = DataStore::maxRevision(d->transaction); | 105 | const auto revision = d->entityStore.maxRevision(); |
137 | const auto elapsed = d->transactionTime.elapsed(); | 106 | const auto elapsed = d->transactionTime.elapsed(); |
138 | SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " | 107 | SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " |
139 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | 108 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; |
140 | if (d->transaction) { | 109 | d->entityStore.commitTransaction(); |
141 | d->transaction.commit(); | ||
142 | } | ||
143 | d->transaction = DataStore::Transaction(); | ||
144 | if (d->revisionChanged) { | 110 | if (d->revisionChanged) { |
145 | d->revisionChanged = false; | 111 | d->revisionChanged = false; |
146 | emit revisionUpdated(revision); | 112 | emit revisionUpdated(revision); |
147 | } | 113 | } |
148 | } | 114 | } |
149 | 115 | ||
150 | DataStore::Transaction &Pipeline::transaction() | ||
151 | { | ||
152 | return d->transaction; | ||
153 | } | ||
154 | |||
155 | DataStore &Pipeline::storage() const | ||
156 | { | ||
157 | return d->storage; | ||
158 | } | ||
159 | |||
160 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | 116 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) |
161 | { | 117 | { |
162 | d->transactionItemCount++; | 118 | d->transactionItemCount++; |
@@ -175,7 +131,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
175 | QByteArray key; | 131 | QByteArray key; |
176 | if (createEntity->entityId()) { | 132 | if (createEntity->entityId()) { |
177 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); | 133 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); |
178 | if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) { | 134 | if (d->entityStore.contains(bufferType, key)) { |
179 | SinkError() << "An entity with this id already exists: " << key; | 135 | SinkError() << "An entity with this id already exists: " << key; |
180 | return KAsync::error<qint64>(0); | 136 | return KAsync::error<qint64>(0); |
181 | } | 137 | } |
@@ -208,29 +164,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
208 | 164 | ||
209 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 165 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
210 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); | 166 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); |
211 | foreach (const auto &processor, d->processors[bufferType]) { | ||
212 | processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); | ||
213 | } | ||
214 | //The maxRevision may have changed meanwhile if the entity created sub-entities | ||
215 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
216 | |||
217 | // Add metadata buffer | ||
218 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
219 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
220 | metadataBuilder.add_revision(newRevision); | ||
221 | metadataBuilder.add_operation(Operation_Creation); | ||
222 | metadataBuilder.add_replayToSource(replayToSource); | ||
223 | auto metadataBuffer = metadataBuilder.Finish(); | ||
224 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
225 | 167 | ||
226 | flatbuffers::FlatBufferBuilder fbb; | 168 | d->revisionChanged = true; |
227 | adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 169 | auto revision = d->entityStore.maxRevision(); |
170 | auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; | ||
171 | o.setChangedProperties(o.availableProperties().toSet()); | ||
228 | 172 | ||
229 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 173 | auto preprocess = [&, this](ApplicationDomain::ApplicationDomainType &newEntity) { |
174 | foreach (const auto &processor, d->processors[bufferType]) { | ||
175 | processor->newEntity(newEntity); | ||
176 | } | ||
177 | }; | ||
230 | 178 | ||
231 | //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource) | 179 | if (!d->entityStore.add(bufferType, o, replayToSource, preprocess)) { |
180 | return KAsync::error<qint64>(0); | ||
181 | } | ||
232 | 182 | ||
233 | return KAsync::value(newRevision); | 183 | return KAsync::value(d->entityStore.maxRevision()); |
234 | } | 184 | } |
235 | 185 | ||
236 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 186 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
@@ -254,6 +204,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
254 | } | 204 | } |
255 | const qint64 baseRevision = modifyEntity->revision(); | 205 | const qint64 baseRevision = modifyEntity->revision(); |
256 | const bool replayToSource = modifyEntity->replayToSource(); | 206 | const bool replayToSource = modifyEntity->replayToSource(); |
207 | |||
257 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); | 208 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); |
258 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | 209 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); |
259 | SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 210 | SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
@@ -269,7 +220,6 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
269 | } | 220 | } |
270 | } | 221 | } |
271 | 222 | ||
272 | // TODO use only readPropertyMapper and writePropertyMapper | ||
273 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); | 223 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
274 | if (!adaptorFactory) { | 224 | if (!adaptorFactory) { |
275 | SinkWarning() << "no adaptor factory for type " << bufferType; | 225 | SinkWarning() << "no adaptor factory for type " << bufferType; |
@@ -278,72 +228,26 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
278 | 228 | ||
279 | auto diffEntity = GetEntity(modifyEntity->delta()->Data()); | 229 | auto diffEntity = GetEntity(modifyEntity->delta()->Data()); |
280 | Q_ASSERT(diffEntity); | 230 | Q_ASSERT(diffEntity); |
281 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 231 | Sink::ApplicationDomain::ApplicationDomainType diff{d->resourceContext.instanceId(), key, baseRevision, adaptorFactory->createAdaptor(*diffEntity)}; |
282 | 232 | diff.setChangedProperties(changeset.toSet()); | |
283 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; | ||
284 | DataStore::mainDatabase(d->transaction, bufferType) | ||
285 | .findLatest(key, | ||
286 | [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
287 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
288 | if (!buffer.isValid()) { | ||
289 | SinkWarning() << "Read invalid buffer from disk"; | ||
290 | } else { | ||
291 | current = adaptorFactory->createAdaptor(buffer.entity()); | ||
292 | } | ||
293 | return false; | ||
294 | }, | ||
295 | [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); | ||
296 | |||
297 | if (!current) { | ||
298 | SinkWarning() << "Failed to read local value " << key; | ||
299 | return KAsync::error<qint64>(0); | ||
300 | } | ||
301 | |||
302 | auto newAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(current), current->availableProperties()); | ||
303 | |||
304 | // Apply diff | ||
305 | // FIXME only apply the properties that are available in the buffer | ||
306 | SinkTrace() << "Applying changed properties: " << changeset; | ||
307 | for (const auto &property : changeset) { | ||
308 | const auto value = diff->getProperty(property); | ||
309 | if (value.isValid()) { | ||
310 | newAdaptor->setProperty(property, value); | ||
311 | } | ||
312 | } | ||
313 | 233 | ||
314 | // Remove deletions | 234 | QByteArrayList deletions; |
315 | if (modifyEntity->deletions()) { | 235 | if (modifyEntity->deletions()) { |
316 | for (const flatbuffers::String *property : *modifyEntity->deletions()) { | 236 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); |
317 | newAdaptor->setProperty(BufferUtils::extractBuffer(property), QVariant()); | ||
318 | } | ||
319 | } | 237 | } |
320 | 238 | ||
321 | newAdaptor->resetChangedProperties(); | 239 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { |
322 | foreach (const auto &processor, d->processors[bufferType]) { | 240 | foreach (const auto &processor, d->processors[bufferType]) { |
323 | processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); | 241 | processor->modifiedEntity(oldEntity, newEntity); |
324 | } | 242 | } |
325 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 243 | }; |
326 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
327 | 244 | ||
328 | // Add metadata buffer | 245 | d->revisionChanged = true; |
329 | flatbuffers::FlatBufferBuilder metadataFbb; | 246 | if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { |
330 | { | 247 | return KAsync::error<qint64>(0); |
331 | //We add availableProperties to account for the properties that have been changed by the preprocessors | ||
332 | auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newAdaptor->changedProperties()); | ||
333 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
334 | metadataBuilder.add_revision(newRevision); | ||
335 | metadataBuilder.add_operation(Operation_Modification); | ||
336 | metadataBuilder.add_replayToSource(replayToSource); | ||
337 | metadataBuilder.add_modifiedProperties(modifiedProperties); | ||
338 | auto metadataBuffer = metadataBuilder.Finish(); | ||
339 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
340 | } | 248 | } |
341 | 249 | ||
342 | flatbuffers::FlatBufferBuilder fbb; | 250 | return KAsync::value(d->entityStore.maxRevision()); |
343 | adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | ||
344 | |||
345 | d->storeNewRevision(newRevision, fbb, bufferType, key); | ||
346 | return KAsync::value(newRevision); | ||
347 | } | 251 | } |
348 | 252 | ||
349 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | 253 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) |
@@ -364,106 +268,38 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
364 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 268 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
365 | SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 269 | SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
366 | 270 | ||
367 | bool found = false; | 271 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { |
368 | bool alreadyRemoved = false; | 272 | foreach (const auto &processor, d->processors[bufferType]) { |
369 | DataStore::mainDatabase(d->transaction, bufferType) | 273 | processor->deletedEntity(oldEntity); |
370 | .findLatest(key, | 274 | } |
371 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | 275 | }; |
372 | auto entity = GetEntity(data.data()); | ||
373 | if (entity && entity->metadata()) { | ||
374 | auto metadata = GetMetadata(entity->metadata()->Data()); | ||
375 | found = true; | ||
376 | if (metadata->operation() == Operation_Removal) { | ||
377 | alreadyRemoved = true; | ||
378 | } | ||
379 | } | ||
380 | return false; | ||
381 | }, | ||
382 | [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); | ||
383 | |||
384 | if (!found) { | ||
385 | SinkWarning() << "Failed to find entity " << key; | ||
386 | return KAsync::error<qint64>(0); | ||
387 | } | ||
388 | if (alreadyRemoved) { | ||
389 | SinkWarning() << "Entity is already removed " << key; | ||
390 | return KAsync::error<qint64>(0); | ||
391 | } | ||
392 | |||
393 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
394 | |||
395 | // Add metadata buffer | ||
396 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
397 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
398 | metadataBuilder.add_revision(newRevision); | ||
399 | metadataBuilder.add_operation(Operation_Removal); | ||
400 | metadataBuilder.add_replayToSource(replayToSource); | ||
401 | auto metadataBuffer = metadataBuilder.Finish(); | ||
402 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
403 | |||
404 | flatbuffers::FlatBufferBuilder fbb; | ||
405 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | ||
406 | 276 | ||
407 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); | 277 | d->revisionChanged = true; |
408 | if (!adaptorFactory) { | 278 | if (!d->entityStore.remove(bufferType, key, replayToSource, preprocess)) { |
409 | SinkWarning() << "no adaptor factory for type " << bufferType; | ||
410 | return KAsync::error<qint64>(0); | 279 | return KAsync::error<qint64>(0); |
411 | } | 280 | } |
412 | 281 | ||
413 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; | 282 | return KAsync::value(d->entityStore.maxRevision()); |
414 | DataStore::mainDatabase(d->transaction, bufferType) | ||
415 | .findLatest(key, | ||
416 | [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | ||
417 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
418 | if (!buffer.isValid()) { | ||
419 | SinkWarning() << "Read invalid buffer from disk"; | ||
420 | } else { | ||
421 | current = adaptorFactory->createAdaptor(buffer.entity()); | ||
422 | } | ||
423 | return false; | ||
424 | }, | ||
425 | [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); | ||
426 | |||
427 | d->storeNewRevision(newRevision, fbb, bufferType, key); | ||
428 | |||
429 | foreach (const auto &processor, d->processors[bufferType]) { | ||
430 | processor->deletedEntity(key, newRevision, *current, d->transaction); | ||
431 | } | ||
432 | |||
433 | return KAsync::value(newRevision); | ||
434 | } | 283 | } |
435 | 284 | ||
436 | void Pipeline::cleanupRevision(qint64 revision) | 285 | void Pipeline::cleanupRevision(qint64 revision) |
437 | { | 286 | { |
287 | d->entityStore.cleanupRevision(revision); | ||
438 | d->revisionChanged = true; | 288 | d->revisionChanged = true; |
439 | const auto uid = DataStore::getUidFromRevision(d->transaction, revision); | ||
440 | const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); | ||
441 | SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; | ||
442 | DataStore::mainDatabase(d->transaction, bufferType) | ||
443 | .scan(uid, | ||
444 | [&](const QByteArray &key, const QByteArray &data) -> bool { | ||
445 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
446 | if (!buffer.isValid()) { | ||
447 | SinkWarning() << "Read invalid buffer from disk"; | ||
448 | } else { | ||
449 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); | ||
450 | const qint64 rev = metadata->revision(); | ||
451 | // Remove old revisions, and the current if the entity has already been removed | ||
452 | if (rev < revision || metadata->operation() == Operation_Removal) { | ||
453 | DataStore::removeRevision(d->transaction, rev); | ||
454 | DataStore::mainDatabase(d->transaction, bufferType).remove(key); | ||
455 | } | ||
456 | } | ||
457 | |||
458 | return true; | ||
459 | }, | ||
460 | [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); | ||
461 | DataStore::setCleanedUpRevision(d->transaction, revision); | ||
462 | } | 289 | } |
463 | 290 | ||
464 | qint64 Pipeline::cleanedUpRevision() | 291 | qint64 Pipeline::cleanedUpRevision() |
465 | { | 292 | { |
466 | return DataStore::cleanedUpRevision(d->transaction); | 293 | /* return d->entityStore.cleanedUpRevision(); */ |
294 | /* return DataStore::cleanedUpRevision(d->transaction); */ | ||
295 | //FIXME Just move the whole cleanup revision iteration into the entitystore | ||
296 | return 0; | ||
297 | } | ||
298 | |||
299 | qint64 Pipeline::revision() | ||
300 | { | ||
301 | //FIXME Just move the whole cleanup revision iteration into the entitystore | ||
302 | return 0; | ||
467 | } | 303 | } |
468 | 304 | ||
469 | class Preprocessor::Private { | 305 | class Preprocessor::Private { |
@@ -492,7 +328,7 @@ void Preprocessor::startBatch() | |||
492 | { | 328 | { |
493 | } | 329 | } |
494 | 330 | ||
495 | void Preprocessor::finalize() | 331 | void Preprocessor::finalizeBatch() |
496 | { | 332 | { |
497 | } | 333 | } |
498 | 334 | ||
@@ -510,7 +346,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain | |||
510 | 346 | ||
511 | flatbuffers::FlatBufferBuilder fbb; | 347 | flatbuffers::FlatBufferBuilder fbb; |
512 | auto entityId = fbb.CreateString(entity.identifier()); | 348 | auto entityId = fbb.CreateString(entity.identifier()); |
513 | // This is the resource buffer type and not the domain type | ||
514 | auto type = fbb.CreateString(typeName); | 349 | auto type = fbb.CreateString(typeName); |
515 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); | 350 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); |
516 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); | 351 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); |
diff --git a/common/pipeline.h b/common/pipeline.h index bf94017..0461507 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -31,7 +31,8 @@ | |||
31 | 31 | ||
32 | #include <Async/Async> | 32 | #include <Async/Async> |
33 | 33 | ||
34 | #include "domainadaptor.h" | 34 | #include <bufferadaptor.h> |
35 | #include <resourcecontext.h> | ||
35 | 36 | ||
36 | namespace Sink { | 37 | namespace Sink { |
37 | 38 | ||
@@ -45,16 +46,14 @@ public: | |||
45 | Pipeline(const ResourceContext &context); | 46 | Pipeline(const ResourceContext &context); |
46 | ~Pipeline(); | 47 | ~Pipeline(); |
47 | 48 | ||
48 | Storage::DataStore &storage() const; | ||
49 | |||
50 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); | 49 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); |
51 | void startTransaction(); | 50 | void startTransaction(); |
52 | void commit(); | 51 | void commit(); |
53 | Storage::DataStore::Transaction &transaction(); | ||
54 | 52 | ||
55 | KAsync::Job<qint64> newEntity(void const *command, size_t size); | 53 | KAsync::Job<qint64> newEntity(void const *command, size_t size); |
56 | KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); | 54 | KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); |
57 | KAsync::Job<qint64> deletedEntity(void const *command, size_t size); | 55 | KAsync::Job<qint64> deletedEntity(void const *command, size_t size); |
56 | |||
58 | /* | 57 | /* |
59 | * Cleans up a single revision. | 58 | * Cleans up a single revision. |
60 | * | 59 | * |
@@ -66,6 +65,7 @@ public: | |||
66 | * Returns the latest cleaned up revision. | 65 | * Returns the latest cleaned up revision. |
67 | */ | 66 | */ |
68 | qint64 cleanedUpRevision(); | 67 | qint64 cleanedUpRevision(); |
68 | qint64 revision(); | ||
69 | 69 | ||
70 | signals: | 70 | signals: |
71 | void revisionUpdated(qint64); | 71 | void revisionUpdated(qint64); |
@@ -82,11 +82,10 @@ public: | |||
82 | virtual ~Preprocessor(); | 82 | virtual ~Preprocessor(); |
83 | 83 | ||
84 | virtual void startBatch(); | 84 | virtual void startBatch(); |
85 | virtual void newEntity(const QByteArray &uid, qint64 revision, ApplicationDomain::BufferAdaptor &newEntity, Storage::DataStore::Transaction &transaction) {}; | 85 | virtual void newEntity(ApplicationDomain::ApplicationDomainType &newEntity) {}; |
86 | virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, | 86 | virtual void modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) {}; |
87 | ApplicationDomain::BufferAdaptor &newEntity, Storage::DataStore::Transaction &transaction) {}; | 87 | virtual void deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity) {}; |
88 | virtual void deletedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, Storage::DataStore::Transaction &transaction) {}; | 88 | virtual void finalizeBatch(); |
89 | virtual void finalize(); | ||
90 | 89 | ||
91 | void setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *); | 90 | void setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *); |
92 | 91 | ||
@@ -110,27 +109,28 @@ template<typename DomainType> | |||
110 | class SINK_EXPORT EntityPreprocessor: public Preprocessor | 109 | class SINK_EXPORT EntityPreprocessor: public Preprocessor |
111 | { | 110 | { |
112 | public: | 111 | public: |
113 | virtual void newEntity(DomainType &, Storage::DataStore::Transaction &transaction) {}; | 112 | virtual void newEntity(DomainType &) {}; |
114 | virtual void modifiedEntity(const DomainType &oldEntity, DomainType &newEntity, Storage::DataStore::Transaction &transaction) {}; | 113 | virtual void modifiedEntity(const DomainType &oldEntity, DomainType &newEntity) {}; |
115 | virtual void deletedEntity(const DomainType &oldEntity, Storage::DataStore::Transaction &transaction) {}; | 114 | virtual void deletedEntity(const DomainType &oldEntity) {}; |
116 | 115 | ||
117 | private: | 116 | private: |
118 | static void nullDeleter(ApplicationDomain::BufferAdaptor *) {} | 117 | virtual void newEntity(ApplicationDomain::ApplicationDomainType &newEntity_) Q_DECL_OVERRIDE |
119 | virtual void newEntity(const QByteArray &uid, qint64 revision, ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
120 | { | 118 | { |
121 | auto o = DomainType("", uid, revision, QSharedPointer<ApplicationDomain::BufferAdaptor>(&bufferAdaptor, nullDeleter)); | 119 | //Modifications still work due to the underlying shared adaptor |
122 | newEntity(o, transaction); | 120 | auto newEntityCopy = DomainType(newEntity_); |
121 | newEntity(newEntityCopy); | ||
123 | } | 122 | } |
124 | 123 | ||
125 | virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, | 124 | virtual void modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity_) Q_DECL_OVERRIDE |
126 | ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | ||
127 | { | 125 | { |
128 | auto o = DomainType("", uid, revision, QSharedPointer<ApplicationDomain::BufferAdaptor>(&bufferAdaptor, nullDeleter)); | 126 | //Modifications still work due to the underlying shared adaptor |
129 | modifiedEntity(DomainType("", uid, 0, QSharedPointer<ApplicationDomain::BufferAdaptor>(const_cast<ApplicationDomain::BufferAdaptor*>(&oldEntity), nullDeleter)), o, transaction); | 127 | auto newEntityCopy = DomainType(newEntity_); |
128 | modifiedEntity(DomainType(oldEntity), newEntityCopy); | ||
130 | } | 129 | } |
131 | virtual void deletedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE | 130 | |
131 | virtual void deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE | ||
132 | { | 132 | { |
133 | deletedEntity(DomainType("", uid, revision, QSharedPointer<ApplicationDomain::BufferAdaptor>(const_cast<ApplicationDomain::BufferAdaptor*>(&bufferAdaptor), nullDeleter)), transaction); | 133 | deletedEntity(DomainType(oldEntity)); |
134 | } | 134 | } |
135 | }; | 135 | }; |
136 | 136 | ||
diff --git a/common/specialpurposepreprocessor.cpp b/common/specialpurposepreprocessor.cpp index 0fd8e34..920f78a 100644 --- a/common/specialpurposepreprocessor.cpp +++ b/common/specialpurposepreprocessor.cpp | |||
@@ -46,11 +46,11 @@ QByteArray getSpecialPurposeType(const QString &name) | |||
46 | 46 | ||
47 | SpecialPurposeProcessor::SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier) {} | 47 | SpecialPurposeProcessor::SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier) {} |
48 | 48 | ||
49 | QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::DataStore::Transaction &transaction, const QByteArray &specialPurpose) | 49 | QByteArray SpecialPurposeProcessor::ensureFolder(const QByteArray &specialPurpose) |
50 | { | 50 | { |
51 | /* if (!mSpecialPurposeFolders.contains(specialPurpose)) { */ | 51 | /* if (!mSpecialPurposeFolders.contains(specialPurpose)) { */ |
52 | /* //Try to find an existing drafts folder */ | 52 | /* //Try to find an existing drafts folder */ |
53 | /* Sink::EntityReader<ApplicationDomain::Folder> reader(mResourceType, mResourceInstanceIdentifier, transaction); */ | 53 | /* Sink::EntityReader<ApplicationDomain::Folder> reader(mResourceType, mResourceInstanceIdentifier); */ |
54 | /* reader.query(Sink::Query().filter<ApplicationDomain::Folder::SpecialPurpose>(Query::Comparator(specialPurpose, Query::Comparator::Contains)), */ | 54 | /* reader.query(Sink::Query().filter<ApplicationDomain::Folder::SpecialPurpose>(Query::Comparator(specialPurpose, Query::Comparator::Contains)), */ |
55 | /* [this, specialPurpose](const ApplicationDomain::Folder &f) -> bool{ */ | 55 | /* [this, specialPurpose](const ApplicationDomain::Folder &f) -> bool{ */ |
56 | /* mSpecialPurposeFolders.insert(specialPurpose, f.identifier()); */ | 56 | /* mSpecialPurposeFolders.insert(specialPurpose, f.identifier()); */ |
@@ -70,23 +70,23 @@ QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::DataStore::Trans | |||
70 | return mSpecialPurposeFolders.value(specialPurpose); | 70 | return mSpecialPurposeFolders.value(specialPurpose); |
71 | } | 71 | } |
72 | 72 | ||
73 | void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) | 73 | void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::ApplicationDomainType &newEntity) |
74 | { | 74 | { |
75 | if (newEntity.getProperty("trash").toBool()) { | 75 | if (newEntity.getProperty("trash").toBool()) { |
76 | newEntity.setProperty("folder", ensureFolder(transaction, "trash")); | 76 | newEntity.setProperty("folder", ensureFolder("trash")); |
77 | return; | 77 | return; |
78 | } | 78 | } |
79 | if (newEntity.getProperty("draft").toBool()) { | 79 | if (newEntity.getProperty("draft").toBool()) { |
80 | newEntity.setProperty("folder", ensureFolder(transaction, "drafts")); | 80 | newEntity.setProperty("folder", ensureFolder("drafts")); |
81 | } | 81 | } |
82 | } | 82 | } |
83 | 83 | ||
84 | void SpecialPurposeProcessor::newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) | 84 | void SpecialPurposeProcessor::newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) |
85 | { | 85 | { |
86 | moveToFolder(newEntity, transaction); | 86 | moveToFolder(newEntity); |
87 | } | 87 | } |
88 | 88 | ||
89 | void SpecialPurposeProcessor::modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) | 89 | void SpecialPurposeProcessor::modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) |
90 | { | 90 | { |
91 | moveToFolder(newEntity, transaction); | 91 | moveToFolder(newEntity); |
92 | } | 92 | } |
diff --git a/common/specialpurposepreprocessor.h b/common/specialpurposepreprocessor.h index 8b2d9e9..f2aeb20 100644 --- a/common/specialpurposepreprocessor.h +++ b/common/specialpurposepreprocessor.h | |||
@@ -30,12 +30,12 @@ class SINK_EXPORT SpecialPurposeProcessor : public Sink::Preprocessor | |||
30 | public: | 30 | public: |
31 | SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); | 31 | SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); |
32 | 32 | ||
33 | QByteArray ensureFolder(Sink::Storage::DataStore::Transaction &transaction, const QByteArray &specialPurpose); | 33 | QByteArray ensureFolder(const QByteArray &specialPurpose); |
34 | 34 | ||
35 | void moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction); | 35 | void moveToFolder(Sink::ApplicationDomain::ApplicationDomainType &newEntity); |
36 | 36 | ||
37 | void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 37 | void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE; |
38 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; | 38 | void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE; |
39 | 39 | ||
40 | QHash<QByteArray, QByteArray> mSpecialPurposeFolders; | 40 | QHash<QByteArray, QByteArray> mSpecialPurposeFolders; |
41 | QByteArray mResourceType; | 41 | QByteArray mResourceType; |
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index fe63f0b..30c7a71 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -25,6 +25,8 @@ | |||
25 | #include "definitions.h" | 25 | #include "definitions.h" |
26 | #include "resourcecontext.h" | 26 | #include "resourcecontext.h" |
27 | #include "index.h" | 27 | #include "index.h" |
28 | #include "bufferutils.h" | ||
29 | #include "entity_generated.h" | ||
28 | 30 | ||
29 | #include "mail.h" | 31 | #include "mail.h" |
30 | #include "folder.h" | 32 | #include "folder.h" |
@@ -108,16 +110,199 @@ void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMo | |||
108 | 110 | ||
109 | void EntityStore::commitTransaction() | 111 | void EntityStore::commitTransaction() |
110 | { | 112 | { |
113 | SinkTrace() << "Committing transaction"; | ||
111 | d->transaction.commit(); | 114 | d->transaction.commit(); |
112 | d->transaction = Storage::DataStore::Transaction(); | 115 | d->transaction = Storage::DataStore::Transaction(); |
113 | } | 116 | } |
114 | 117 | ||
115 | void EntityStore::abortTransaction() | 118 | void EntityStore::abortTransaction() |
116 | { | 119 | { |
120 | SinkTrace() << "Aborting transaction"; | ||
117 | d->transaction.abort(); | 121 | d->transaction.abort(); |
118 | d->transaction = Storage::DataStore::Transaction(); | 122 | d->transaction = Storage::DataStore::Transaction(); |
119 | } | 123 | } |
120 | 124 | ||
125 | bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) | ||
126 | { | ||
127 | if (entity_.identifier().isEmpty()) { | ||
128 | SinkWarning() << "Can't write entity with an empty identifier"; | ||
129 | return false; | ||
130 | } | ||
131 | |||
132 | auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(entity_, entity_.availableProperties()); | ||
133 | entity.setChangedProperties(entity.availableProperties().toSet()); | ||
134 | |||
135 | preprocess(entity); | ||
136 | d->typeIndex(type).add(entity.identifier(), entity, d->transaction); | ||
137 | |||
138 | //The maxRevision may have changed meanwhile if the entity created sub-entities | ||
139 | const qint64 newRevision = maxRevision() + 1; | ||
140 | |||
141 | // Add metadata buffer | ||
142 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
143 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
144 | metadataBuilder.add_revision(newRevision); | ||
145 | metadataBuilder.add_operation(Operation_Creation); | ||
146 | metadataBuilder.add_replayToSource(replayToSource); | ||
147 | auto metadataBuffer = metadataBuilder.Finish(); | ||
148 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
149 | |||
150 | flatbuffers::FlatBufferBuilder fbb; | ||
151 | d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | ||
152 | |||
153 | DataStore::mainDatabase(d->transaction, type) | ||
154 | .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), | ||
155 | [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << entity.identifier() << newRevision; }); | ||
156 | DataStore::setMaxRevision(d->transaction, newRevision); | ||
157 | DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); | ||
158 | SinkTrace() << "Wrote entity: " << entity.identifier() << type << newRevision; | ||
159 | return true; | ||
160 | } | ||
161 | |||
162 | bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess) | ||
163 | { | ||
164 | auto changeset = diff.changedProperties(); | ||
165 | //TODO handle errors | ||
166 | const auto current = readLatest(type, diff.identifier()); | ||
167 | if (current.identifier().isEmpty()) { | ||
168 | SinkWarning() << "Failed to read current version: " << diff.identifier(); | ||
169 | return false; | ||
170 | } | ||
171 | |||
172 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); | ||
173 | |||
174 | // Apply diff | ||
175 | //SinkTrace() << "Applying changed properties: " << changeset; | ||
176 | for (const auto &property : changeset) { | ||
177 | const auto value = diff.getProperty(property); | ||
178 | if (value.isValid()) { | ||
179 | //SinkTrace() << "Setting property: " << property; | ||
180 | newEntity.setProperty(property, value); | ||
181 | } | ||
182 | } | ||
183 | |||
184 | // Remove deletions | ||
185 | for (const auto property : deletions) { | ||
186 | //SinkTrace() << "Removing property: " << property; | ||
187 | newEntity.setProperty(property, QVariant()); | ||
188 | } | ||
189 | |||
190 | preprocess(current, newEntity); | ||
191 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); | ||
192 | d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); | ||
193 | |||
194 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
195 | |||
196 | // Add metadata buffer | ||
197 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
198 | { | ||
199 | //We add availableProperties to account for the properties that have been changed by the preprocessors | ||
200 | auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties()); | ||
201 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
202 | metadataBuilder.add_revision(newRevision); | ||
203 | metadataBuilder.add_operation(Operation_Modification); | ||
204 | metadataBuilder.add_replayToSource(replayToSource); | ||
205 | metadataBuilder.add_modifiedProperties(modifiedProperties); | ||
206 | auto metadataBuffer = metadataBuilder.Finish(); | ||
207 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
208 | } | ||
209 | |||
210 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); | ||
211 | SinkTrace() << "All properties: " << newEntity.availableProperties(); | ||
212 | |||
213 | flatbuffers::FlatBufferBuilder fbb; | ||
214 | d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | ||
215 | |||
216 | DataStore::mainDatabase(d->transaction, type) | ||
217 | .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), | ||
218 | [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << newEntity.identifier() << newRevision; }); | ||
219 | DataStore::setMaxRevision(d->transaction, newRevision); | ||
220 | DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); | ||
221 | SinkTrace() << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; | ||
222 | return true; | ||
223 | } | ||
224 | |||
225 | bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess) | ||
226 | { | ||
227 | bool found = false; | ||
228 | bool alreadyRemoved = false; | ||
229 | DataStore::mainDatabase(d->transaction, type) | ||
230 | .findLatest(uid, | ||
231 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | ||
232 | auto entity = GetEntity(data.data()); | ||
233 | if (entity && entity->metadata()) { | ||
234 | auto metadata = GetMetadata(entity->metadata()->Data()); | ||
235 | found = true; | ||
236 | if (metadata->operation() == Operation_Removal) { | ||
237 | alreadyRemoved = true; | ||
238 | } | ||
239 | } | ||
240 | return false; | ||
241 | }, | ||
242 | [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); | ||
243 | |||
244 | if (!found) { | ||
245 | SinkWarning() << "Failed to find entity " << uid; | ||
246 | return false; | ||
247 | } | ||
248 | if (alreadyRemoved) { | ||
249 | SinkWarning() << "Entity is already removed " << uid; | ||
250 | return false; | ||
251 | } | ||
252 | |||
253 | const auto current = readLatest(type, uid); | ||
254 | preprocess(current); | ||
255 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); | ||
256 | |||
257 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
258 | |||
259 | // Add metadata buffer | ||
260 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
261 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
262 | metadataBuilder.add_revision(newRevision); | ||
263 | metadataBuilder.add_operation(Operation_Removal); | ||
264 | metadataBuilder.add_replayToSource(replayToSource); | ||
265 | auto metadataBuffer = metadataBuilder.Finish(); | ||
266 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
267 | |||
268 | flatbuffers::FlatBufferBuilder fbb; | ||
269 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | ||
270 | |||
271 | DataStore::mainDatabase(d->transaction, type) | ||
272 | .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | ||
273 | [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); | ||
274 | DataStore::setMaxRevision(d->transaction, newRevision); | ||
275 | DataStore::recordRevision(d->transaction, newRevision, uid, type); | ||
276 | return true; | ||
277 | } | ||
278 | |||
279 | void EntityStore::cleanupRevision(qint64 revision) | ||
280 | { | ||
281 | const auto uid = DataStore::getUidFromRevision(d->transaction, revision); | ||
282 | const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); | ||
283 | SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; | ||
284 | DataStore::mainDatabase(d->transaction, bufferType) | ||
285 | .scan(uid, | ||
286 | [&](const QByteArray &key, const QByteArray &data) -> bool { | ||
287 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
288 | if (!buffer.isValid()) { | ||
289 | SinkWarning() << "Read invalid buffer from disk"; | ||
290 | } else { | ||
291 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); | ||
292 | const qint64 rev = metadata->revision(); | ||
293 | // Remove old revisions, and the current if the entity has already been removed | ||
294 | if (rev < revision || metadata->operation() == Operation_Removal) { | ||
295 | DataStore::removeRevision(d->transaction, rev); | ||
296 | DataStore::mainDatabase(d->transaction, bufferType).remove(key); | ||
297 | } | ||
298 | } | ||
299 | |||
300 | return true; | ||
301 | }, | ||
302 | [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); | ||
303 | DataStore::setCleanedUpRevision(d->transaction, revision); | ||
304 | } | ||
305 | |||
121 | QVector<QByteArray> EntityStore::fullScan(const QByteArray &type) | 306 | QVector<QByteArray> EntityStore::fullScan(const QByteArray &type) |
122 | { | 307 | { |
123 | SinkTrace() << "Looking for : " << type; | 308 | SinkTrace() << "Looking for : " << type; |
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index 455e9c3..65bff50 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h | |||
@@ -38,9 +38,14 @@ public: | |||
38 | typedef QSharedPointer<EntityStore> Ptr; | 38 | typedef QSharedPointer<EntityStore> Ptr; |
39 | EntityStore(const ResourceContext &resourceContext); | 39 | EntityStore(const ResourceContext &resourceContext); |
40 | 40 | ||
41 | void add(const ApplicationDomain::ApplicationDomainType &); | 41 | typedef std::function<void(const ApplicationDomain::ApplicationDomainType &, ApplicationDomain::ApplicationDomainType &)> PreprocessModification; |
42 | void modify(const ApplicationDomain::ApplicationDomainType &); | 42 | typedef std::function<void(ApplicationDomain::ApplicationDomainType &)> PreprocessCreation; |
43 | void remove(const ApplicationDomain::ApplicationDomainType &); | 43 | typedef std::function<void(const ApplicationDomain::ApplicationDomainType &)> PreprocessRemoval; |
44 | |||
45 | bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); | ||
46 | bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &); | ||
47 | bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); | ||
48 | void cleanupRevision(qint64 revision); | ||
44 | 49 | ||
45 | void startTransaction(Sink::Storage::DataStore::AccessMode); | 50 | void startTransaction(Sink::Storage::DataStore::AccessMode); |
46 | void commitTransaction(); | 51 | void commitTransaction(); |
diff --git a/common/typeindex.cpp b/common/typeindex.cpp index 64c2a01..7920efc 100644 --- a/common/typeindex.cpp +++ b/common/typeindex.cpp | |||
@@ -108,30 +108,30 @@ void TypeIndex::addPropertyWithSorting<QByteArray, QDateTime>(const QByteArray & | |||
108 | mSortedProperties.insert(property, sortProperty); | 108 | mSortedProperties.insert(property, sortProperty); |
109 | } | 109 | } |
110 | 110 | ||
111 | void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | 111 | void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction) |
112 | { | 112 | { |
113 | for (const auto &property : mProperties) { | 113 | for (const auto &property : mProperties) { |
114 | const auto value = bufferAdaptor.getProperty(property); | 114 | const auto value = entity.getProperty(property); |
115 | auto indexer = mIndexer.value(property); | 115 | auto indexer = mIndexer.value(property); |
116 | indexer(identifier, value, transaction); | 116 | indexer(identifier, value, transaction); |
117 | } | 117 | } |
118 | for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { | 118 | for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { |
119 | const auto value = bufferAdaptor.getProperty(it.key()); | 119 | const auto value = entity.getProperty(it.key()); |
120 | const auto sortValue = bufferAdaptor.getProperty(it.value()); | 120 | const auto sortValue = entity.getProperty(it.value()); |
121 | auto indexer = mSortIndexer.value(it.key() + it.value()); | 121 | auto indexer = mSortIndexer.value(it.key() + it.value()); |
122 | indexer(identifier, value, sortValue, transaction); | 122 | indexer(identifier, value, sortValue, transaction); |
123 | } | 123 | } |
124 | } | 124 | } |
125 | 125 | ||
126 | void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) | 126 | void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction) |
127 | { | 127 | { |
128 | for (const auto &property : mProperties) { | 128 | for (const auto &property : mProperties) { |
129 | const auto value = bufferAdaptor.getProperty(property); | 129 | const auto value = entity.getProperty(property); |
130 | Index(indexName(property), transaction).remove(getByteArray(value), identifier); | 130 | Index(indexName(property), transaction).remove(getByteArray(value), identifier); |
131 | } | 131 | } |
132 | for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { | 132 | for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { |
133 | const auto propertyValue = bufferAdaptor.getProperty(it.key()); | 133 | const auto propertyValue = entity.getProperty(it.key()); |
134 | const auto sortValue = bufferAdaptor.getProperty(it.value()); | 134 | const auto sortValue = entity.getProperty(it.value()); |
135 | if (sortValue.type() == QVariant::DateTime) { | 135 | if (sortValue.type() == QVariant::DateTime) { |
136 | Index(indexName(it.key(), it.value()), transaction).remove(propertyValue.toByteArray() + toSortableByteArray(sortValue.toDateTime()), identifier); | 136 | Index(indexName(it.key(), it.value()), transaction).remove(propertyValue.toByteArray() + toSortableByteArray(sortValue.toDateTime()), identifier); |
137 | } else { | 137 | } else { |
diff --git a/common/typeindex.h b/common/typeindex.h index 2638577..e11e673 100644 --- a/common/typeindex.h +++ b/common/typeindex.h | |||
@@ -19,7 +19,6 @@ | |||
19 | #pragma once | 19 | #pragma once |
20 | 20 | ||
21 | #include "resultset.h" | 21 | #include "resultset.h" |
22 | #include "bufferadaptor.h" | ||
23 | #include "storage.h" | 22 | #include "storage.h" |
24 | #include "query.h" | 23 | #include "query.h" |
25 | #include "log.h" | 24 | #include "log.h" |
@@ -52,8 +51,8 @@ public: | |||
52 | { | 51 | { |
53 | mSecondaryProperties.insert(Left::name, Right::name); | 52 | mSecondaryProperties.insert(Left::name, Right::name); |
54 | } | 53 | } |
55 | void add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | 54 | void add(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction); |
56 | void remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); | 55 | void remove(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction); |
57 | 56 | ||
58 | QVector<QByteArray> query(const Sink::Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction); | 57 | QVector<QByteArray> query(const Sink::Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction); |
59 | QVector<QByteArray> lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction); | 58 | QVector<QByteArray> lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction); |