summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/datastorequery.cpp246
-rw-r--r--common/datastorequery.h59
-rw-r--r--common/domain/event.cpp13
-rw-r--r--common/domain/event.h2
-rw-r--r--common/domain/folder.cpp11
-rw-r--r--common/domain/folder.h2
-rw-r--r--common/domain/mail.cpp13
-rw-r--r--common/domain/mail.h2
-rw-r--r--common/entitybuffer.cpp14
-rw-r--r--common/entitybuffer.h6
-rw-r--r--common/entityreader.cpp250
-rw-r--r--common/entityreader.h11
-rw-r--r--common/resultset.cpp6
-rw-r--r--common/resultset.h7
15 files changed, 409 insertions, 234 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 5eb15ba..0fc8d81 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -75,6 +75,7 @@ set(command_SRCS
75 entityreader.cpp 75 entityreader.cpp
76 mailpreprocessor.cpp 76 mailpreprocessor.cpp
77 specialpurposepreprocessor.cpp 77 specialpurposepreprocessor.cpp
78 datastorequery.cpp
78 ${storage_SRCS}) 79 ${storage_SRCS})
79 80
80add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 81add_library(${PROJECT_NAME} SHARED ${command_SRCS})
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
new file mode 100644
index 0000000..3237c53
--- /dev/null
+++ b/common/datastorequery.cpp
@@ -0,0 +1,246 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19#include "datastorequery.h"
20
21#include "log.h"
22#include "entitybuffer.h"
23#include "entity_generated.h"
24
25using namespace Sink;
26
27
28SINK_DEBUG_AREA("datastorequery")
29
30DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty)
31 : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty)
32{
33
34}
35
36static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
37{
38 // TODO use a result set with an iterator, to read values on demand
39 SinkTrace() << "Looking for : " << bufferType;
40 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
41 QSet<QByteArray> keys;
42 Storage::mainDatabase(transaction, bufferType)
43 .scan(QByteArray(),
44 [&](const QByteArray &key, const QByteArray &value) -> bool {
45 if (keys.contains(Sink::Storage::uidFromKey(key))) {
46 //Not something that should persist if the replay works, so we keep a message for now.
47 SinkTrace() << "Multiple revisions for key: " << key;
48 }
49 keys << Sink::Storage::uidFromKey(key);
50 return true;
51 },
52 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
53
54 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
55 return ResultSet(keys.toList().toVector());
56}
57
58ResultSet DataStoreQuery::loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
59{
60 if (!mQuery.ids.isEmpty()) {
61 return ResultSet(mQuery.ids.toVector());
62 }
63 QSet<QByteArray> appliedFilters;
64 QByteArray appliedSorting;
65
66 auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction);
67
68 remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters;
69 if (appliedSorting.isEmpty()) {
70 remainingSorting = mQuery.sortProperty;
71 }
72
73 // We do a full scan if there were no indexes available to create the initial set.
74 if (appliedFilters.isEmpty()) {
75 // TODO this should be replaced by an index lookup as well
76 resultSet = fullScan(mTransaction, mType);
77 }
78 return resultSet;
79}
80
81ResultSet DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters)
82{
83 const auto bufferType = mType;
84 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
85 remainingFilters = mQuery.propertyFilter.keys().toSet();
86 return ResultSet([this, bufferType, revisionCounter]() -> QByteArray {
87 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction);
88 // Spit out the revision keys one by one.
89 while (*revisionCounter <= topRevision) {
90 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
91 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
92 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
93 Q_ASSERT(!uid.isEmpty());
94 Q_ASSERT(!type.isEmpty());
95 if (type != bufferType) {
96 // Skip revision
97 *revisionCounter += 1;
98 continue;
99 }
100 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
101 *revisionCounter += 1;
102 return key;
103 }
104 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
105 // We're done
106 return QByteArray();
107 });
108}
109
110void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback)
111{
112 mDb.findLatest(key,
113 [=](const QByteArray &key, const QByteArray &value) -> bool {
114 resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size()));
115 return false;
116 },
117 [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; });
118}
119
120QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property)
121{
122 return mGetProperty(entity, property);
123}
124
125ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty)
126{
127 const bool sortingRequired = !sortProperty.isEmpty();
128 if (initialQuery && sortingRequired) {
129 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty;
130 // Sort the complete set by reading the sort property and filling into a sorted map
131 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
132 while (resultSet.next()) {
133 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
134 readEntity(resultSet.id(),
135 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
136
137 const auto operation = buffer.operation();
138
139 // We're not interested in removals during the initial query
140 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
141 if (!sortProperty.isEmpty()) {
142 const auto sortValue = getProperty(buffer.entity(), sortProperty);
143 if (sortValue.type() == QVariant::DateTime) {
144 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid);
145 } else {
146 sortedMap->insert(sortValue.toString().toLatin1(), uid);
147 }
148 } else {
149 sortedMap->insert(uid, uid);
150 }
151 }
152 });
153 }
154
155 SinkTrace() << "Sorted " << sortedMap->size() << " values.";
156 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
157 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter, initialQuery](
158 std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool {
159 if (iterator->hasNext()) {
160 readEntity(iterator->next().value(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
161 callback(uid, buffer, Sink::Operation_Creation);
162 });
163 return true;
164 }
165 return false;
166 };
167
168 auto skip = [iterator]() {
169 if (iterator->hasNext()) {
170 iterator->next();
171 }
172 };
173 return ResultSet(generator, skip);
174 } else {
175 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
176 ResultSet::ValueGenerator generator = [this, resultSetPtr, filter, initialQuery](const ResultSet::Callback &callback) -> bool {
177 if (resultSetPtr->next()) {
178 SinkTrace() << "Reading the next value: " << resultSetPtr->id();
179 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
180 readEntity(resultSetPtr->id(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
181 const auto operation = buffer.operation();
182 if (initialQuery) {
183 // We're not interested in removals during the initial query
184 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
185 // In the initial set every entity is new
186 callback(uid, buffer, Sink::Operation_Creation);
187 }
188 } else {
189 // Always remove removals, they probably don't match due to non-available properties
190 if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) {
191 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
192 callback(uid, buffer, operation);
193 }
194 }
195 });
196 return true;
197 }
198 return false;
199 };
200 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
201 return ResultSet(generator, skip);
202 }
203}
204
205
206DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet<QByteArray> &remainingFilters)
207{
208 auto query = mQuery;
209 return [this, remainingFilters, query](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool {
210 if (!query.ids.isEmpty()) {
211 if (!query.ids.contains(uid)) {
212 SinkTrace() << "Filter by uid: " << uid;
213 return false;
214 }
215 }
216 for (const auto &filterProperty : remainingFilters) {
217 const auto property = getProperty(entity.entity(), filterProperty);
218 const auto comparator = query.propertyFilter.value(filterProperty);
219 if (!comparator.matches(property)) {
220 SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
221 return false;
222 }
223 }
224 return true;
225 };
226}
227
228ResultSet DataStoreQuery::update(qint64 baseRevision)
229{
230 SinkTrace() << "Executing query update";
231 QSet<QByteArray> remainingFilters;
232 QByteArray remainingSorting;
233 auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters);
234 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), false, remainingSorting);
235 return filteredSet;
236}
237
238ResultSet DataStoreQuery::execute()
239{
240 SinkTrace() << "Executing query";
241 QSet<QByteArray> remainingFilters;
242 QByteArray remainingSorting;
243 auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting);
244 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), true, remainingSorting);
245 return filteredSet;
246}
diff --git a/common/datastorequery.h b/common/datastorequery.h
new file mode 100644
index 0000000..cf9d9e2
--- /dev/null
+++ b/common/datastorequery.h
@@ -0,0 +1,59 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19#pragma once
20
21#include "query.h"
22#include "storage.h"
23#include "resultset.h"
24#include "typeindex.h"
25#include "query.h"
26#include "entitybuffer.h"
27
28class DataStoreQuery {
29public:
30 DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty);
31 ResultSet execute();
32 ResultSet update(qint64 baseRevision);
33
34private:
35
36 typedef std::function<bool(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> FilterFunction;
37 typedef std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> BufferCallback;
38
39 QVariant getProperty(const Sink::Entity &entity, const QByteArray &property);
40
41 void readEntity(const QByteArray &key, const BufferCallback &resultCallback);
42
43 ResultSet loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting);
44 ResultSet loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters);
45
46 ResultSet filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty);
47 FilterFunction getFilter(const QSet<QByteArray> &remainingFilters);
48
49 Sink::Query mQuery;
50 Sink::Storage::Transaction &mTransaction;
51 const QByteArray mType;
52 TypeIndex &mTypeIndex;
53 Sink::Storage::NamedDatabase mDb;
54 std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> mGetProperty;
55};
56
57
58
59
diff --git a/common/domain/event.cpp b/common/domain/event.cpp
index 0909bf1..dfbcb61 100644
--- a/common/domain/event.cpp
+++ b/common/domain/event.cpp
@@ -32,6 +32,8 @@
32#include "../query.h" 32#include "../query.h"
33#include "../definitions.h" 33#include "../definitions.h"
34#include "../typeindex.h" 34#include "../typeindex.h"
35#include "entitybuffer.h"
36#include "entity_generated.h"
35 37
36#include "event_generated.h" 38#include "event_generated.h"
37 39
@@ -84,3 +86,14 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Event>::BufferBuilder> > T
84 propertyMapper->addMapping<Event::Attachment>(&BufferBuilder::add_attachment); 86 propertyMapper->addMapping<Event::Attachment>(&BufferBuilder::add_attachment);
85 return propertyMapper; 87 return propertyMapper;
86} 88}
89
90DataStoreQuery TypeImplementation<Event>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction)
91{
92
93 auto mapper = initializeReadPropertyMapper();
94 return DataStoreQuery(query, ApplicationDomain::getTypeName<Event>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) {
95
96 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
97 return mapper->getProperty(property, localBuffer);
98 });
99}
diff --git a/common/domain/event.h b/common/domain/event.h
index 5315566..4ac572c 100644
--- a/common/domain/event.h
+++ b/common/domain/event.h
@@ -21,6 +21,7 @@
21#include "applicationdomaintype.h" 21#include "applicationdomaintype.h"
22 22
23#include "storage.h" 23#include "storage.h"
24#include "datastorequery.h"
24 25
25class ResultSet; 26class ResultSet;
26class QByteArray; 27class QByteArray;
@@ -50,6 +51,7 @@ public:
50 typedef Sink::ApplicationDomain::Buffer::Event Buffer; 51 typedef Sink::ApplicationDomain::Buffer::Event Buffer;
51 typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; 52 typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder;
52 static QSet<QByteArray> indexedProperties(); 53 static QSet<QByteArray> indexedProperties();
54 static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction);
53 /** 55 /**
54 * Returns the potential result set based on the indexes. 56 * Returns the potential result set based on the indexes.
55 * 57 *
diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp
index ddb0c10..6d487b1 100644
--- a/common/domain/folder.cpp
+++ b/common/domain/folder.cpp
@@ -32,6 +32,8 @@
32#include "../query.h" 32#include "../query.h"
33#include "../definitions.h" 33#include "../definitions.h"
34#include "../typeindex.h" 34#include "../typeindex.h"
35#include "entitybuffer.h"
36#include "entity_generated.h"
35 37
36#include "folder_generated.h" 38#include "folder_generated.h"
37 39
@@ -88,3 +90,12 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Folder>::BufferBuilder> >
88 propertyMapper->addMapping<Folder::SpecialPurpose>(&BufferBuilder::add_specialpurpose); 90 propertyMapper->addMapping<Folder::SpecialPurpose>(&BufferBuilder::add_specialpurpose);
89 return propertyMapper; 91 return propertyMapper;
90} 92}
93
94DataStoreQuery TypeImplementation<Folder>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction)
95{
96 auto mapper = initializeReadPropertyMapper();
97 return DataStoreQuery(query, ApplicationDomain::getTypeName<Folder>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) {
98 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
99 return mapper->getProperty(property, localBuffer);
100 });
101}
diff --git a/common/domain/folder.h b/common/domain/folder.h
index 6e066e1..77edc8a 100644
--- a/common/domain/folder.h
+++ b/common/domain/folder.h
@@ -21,6 +21,7 @@
21#include "applicationdomaintype.h" 21#include "applicationdomaintype.h"
22 22
23#include "storage.h" 23#include "storage.h"
24#include "datastorequery.h"
24 25
25class ResultSet; 26class ResultSet;
26class QByteArray; 27class QByteArray;
@@ -44,6 +45,7 @@ class TypeImplementation<Sink::ApplicationDomain::Folder> {
44public: 45public:
45 typedef Sink::ApplicationDomain::Buffer::Folder Buffer; 46 typedef Sink::ApplicationDomain::Buffer::Folder Buffer;
46 typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; 47 typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder;
48 static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction);
47 static QSet<QByteArray> indexedProperties(); 49 static QSet<QByteArray> indexedProperties();
48 static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); 50 static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction);
49 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 51 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction);
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp
index 13e1305..bb5ad58 100644
--- a/common/domain/mail.cpp
+++ b/common/domain/mail.cpp
@@ -32,6 +32,8 @@
32#include "../query.h" 32#include "../query.h"
33#include "../definitions.h" 33#include "../definitions.h"
34#include "../typeindex.h" 34#include "../typeindex.h"
35#include "entitybuffer.h"
36#include "entity_generated.h"
35 37
36#include "mail_generated.h" 38#include "mail_generated.h"
37 39
@@ -110,3 +112,14 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Mail>::BufferBuilder> > Ty
110 propertyMapper->addMapping<Mail::Sent>(&BufferBuilder::add_sent); 112 propertyMapper->addMapping<Mail::Sent>(&BufferBuilder::add_sent);
111 return propertyMapper; 113 return propertyMapper;
112} 114}
115
116DataStoreQuery TypeImplementation<Mail>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction)
117{
118 auto mapper = initializeReadPropertyMapper();
119 return DataStoreQuery(query, ApplicationDomain::getTypeName<Mail>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) {
120
121 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
122 return mapper->getProperty(property, localBuffer);
123 });
124}
125
diff --git a/common/domain/mail.h b/common/domain/mail.h
index ff169dd..d6af9c5 100644
--- a/common/domain/mail.h
+++ b/common/domain/mail.h
@@ -21,6 +21,7 @@
21#include "applicationdomaintype.h" 21#include "applicationdomaintype.h"
22 22
23#include "storage.h" 23#include "storage.h"
24#include "datastorequery.h"
24 25
25class ResultSet; 26class ResultSet;
26class QByteArray; 27class QByteArray;
@@ -44,6 +45,7 @@ class TypeImplementation<Sink::ApplicationDomain::Mail> {
44public: 45public:
45 typedef Sink::ApplicationDomain::Buffer::Mail Buffer; 46 typedef Sink::ApplicationDomain::Buffer::Mail Buffer;
46 typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; 47 typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder;
48 static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction);
47 static QSet<QByteArray> indexedProperties(); 49 static QSet<QByteArray> indexedProperties();
48 /** 50 /**
49 * Returns the potential result set based on the indexes. 51 * Returns the potential result set based on the indexes.
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp
index 950bc46..32583cc 100644
--- a/common/entitybuffer.cpp
+++ b/common/entitybuffer.cpp
@@ -26,7 +26,7 @@ bool EntityBuffer::isValid() const
26 return mEntity; 26 return mEntity;
27} 27}
28 28
29const Sink::Entity &EntityBuffer::entity() 29const Sink::Entity &EntityBuffer::entity() const
30{ 30{
31 Q_ASSERT(mEntity); 31 Q_ASSERT(mEntity);
32 return *mEntity; 32 return *mEntity;
@@ -84,3 +84,15 @@ void EntityBuffer::assembleEntityBuffer(
84 auto entity = Sink::CreateEntity(fbb, metadata, resource, local); 84 auto entity = Sink::CreateEntity(fbb, metadata, resource, local);
85 Sink::FinishEntityBuffer(fbb, entity); 85 Sink::FinishEntityBuffer(fbb, entity);
86} 86}
87
88Sink::Operation EntityBuffer::operation() const
89{
90 const auto metadataBuffer = readBuffer<Sink::Metadata>(mEntity->metadata());
91 return metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
92}
93
94qint64 EntityBuffer::revision() const
95{
96 const auto metadataBuffer = readBuffer<Sink::Metadata>(mEntity->metadata());
97 return metadataBuffer ? metadataBuffer->revision() : -1;
98}
diff --git a/common/entitybuffer.h b/common/entitybuffer.h
index 866a7d0..4162605 100644
--- a/common/entitybuffer.h
+++ b/common/entitybuffer.h
@@ -3,6 +3,7 @@
3#include "sink_export.h" 3#include "sink_export.h"
4#include <functional> 4#include <functional>
5#include <flatbuffers/flatbuffers.h> 5#include <flatbuffers/flatbuffers.h>
6#include "metadata_generated.h"
6#include <QByteArray> 7#include <QByteArray>
7 8
8namespace Sink { 9namespace Sink {
@@ -16,9 +17,12 @@ public:
16 const uint8_t *resourceBuffer(); 17 const uint8_t *resourceBuffer();
17 const uint8_t *metadataBuffer(); 18 const uint8_t *metadataBuffer();
18 const uint8_t *localBuffer(); 19 const uint8_t *localBuffer();
19 const Entity &entity(); 20 const Entity &entity() const;
20 bool isValid() const; 21 bool isValid() const;
21 22
23 Sink::Operation operation() const;
24 qint64 revision() const;
25
22 static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler); 26 static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler);
23 /* 27 /*
24 * TODO: Ideally we would be passing references to vectors in the same bufferbuilder, to avoid needlessly copying data. 28 * TODO: Ideally we would be passing references to vectors in the same bufferbuilder, to avoid needlessly copying data.
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
index 01c25d2..faa154b 100644
--- a/common/entityreader.cpp
+++ b/common/entityreader.cpp
@@ -150,205 +150,41 @@ void EntityReader<DomainType>::query(const Sink::Query &query, const std::functi
150 }); 150 });
151} 151}
152 152
153template <class DomainType> 153/* template <class DomainType> */
154void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, 154/* void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, */
155 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) 155/* const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) */
156{ 156/* { */
157 db.findLatest(key, 157/* db.findLatest(key, */
158 [=](const QByteArray &key, const QByteArray &value) -> bool { 158/* [=](const QByteArray &key, const QByteArray &value) -> bool { */
159 Sink::EntityBuffer buffer(value.data(), value.size()); 159/* Sink::EntityBuffer buffer(value.data(), value.size()); */
160 const Sink::Entity &entity = buffer.entity(); 160/* const Sink::Entity &entity = buffer.entity(); */
161 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 161/* const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); */
162 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; 162/* const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; */
163 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; 163/* const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; */
164 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); 164/* auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); */
165 Q_ASSERT(adaptor); 165/* Q_ASSERT(adaptor); */
166 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); 166/* resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); */
167 return false; 167/* return false; */
168 }, 168/* }, */
169 [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); 169/* [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); */
170} 170/* } */
171
172static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
173{
174 // TODO use a result set with an iterator, to read values on demand
175 SinkTrace() << "Looking for : " << bufferType;
176 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
177 QSet<QByteArray> keys;
178 Storage::mainDatabase(transaction, bufferType)
179 .scan(QByteArray(),
180 [&](const QByteArray &key, const QByteArray &value) -> bool {
181 if (keys.contains(Sink::Storage::uidFromKey(key))) {
182 //Not something that should persist if the replay works, so we keep a message for now.
183 SinkTrace() << "Multiple revisions for key: " << key;
184 }
185 keys << Sink::Storage::uidFromKey(key);
186 return true;
187 },
188 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
189
190 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
191 return ResultSet(keys.toList().toVector());
192}
193
194template <class DomainType>
195ResultSet EntityReader<DomainType>::loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
196{
197 if (!query.ids.isEmpty()) {
198 return ResultSet(query.ids.toVector());
199 }
200 QSet<QByteArray> appliedFilters;
201 QByteArray appliedSorting;
202 auto resultSet = Sink::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, mTransaction);
203 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
204 if (appliedSorting.isEmpty()) {
205 remainingSorting = query.sortProperty;
206 }
207 171
208 // We do a full scan if there were no indexes available to create the initial set.
209 if (appliedFilters.isEmpty()) {
210 // TODO this should be replaced by an index lookup as well
211 resultSet = fullScan(mTransaction, ApplicationDomain::getTypeName<DomainType>());
212 }
213 return resultSet;
214}
215
216template <class DomainType>
217ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters)
218{
219 const auto bufferType = ApplicationDomain::getTypeName<DomainType>();
220 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
221 remainingFilters = query.propertyFilter.keys().toSet();
222 return ResultSet([this, bufferType, revisionCounter]() -> QByteArray {
223 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction);
224 // Spit out the revision keys one by one.
225 while (*revisionCounter <= topRevision) {
226 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
227 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
228 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
229 Q_ASSERT(!uid.isEmpty());
230 Q_ASSERT(!type.isEmpty());
231 if (type != bufferType) {
232 // Skip revision
233 *revisionCounter += 1;
234 continue;
235 }
236 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
237 *revisionCounter += 1;
238 return key;
239 }
240 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
241 // We're done
242 return QByteArray();
243 });
244}
245
246template <class DomainType>
247ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
248 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty)
249{
250 const bool sortingRequired = !sortProperty.isEmpty();
251 if (initialQuery && sortingRequired) {
252 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty;
253 // Sort the complete set by reading the sort property and filling into a sorted map
254 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
255 while (resultSet.next()) {
256 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
257 readEntity(db, resultSet.id(),
258 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
259 // We're not interested in removals during the initial query
260 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
261 if (!sortProperty.isEmpty()) {
262 const auto sortValue = domainObject->getProperty(sortProperty);
263 if (sortValue.type() == QVariant::DateTime) {
264 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier());
265 } else {
266 sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier());
267 }
268 } else {
269 sortedMap->insert(domainObject->identifier(), domainObject->identifier());
270 }
271 }
272 });
273 }
274
275 SinkTrace() << "Sorted " << sortedMap->size() << " values.";
276 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
277 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](
278 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
279 if (iterator->hasNext()) {
280 readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject,
281 Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); });
282 return true;
283 }
284 return false;
285 };
286 172
287 auto skip = [iterator]() {
288 if (iterator->hasNext()) {
289 iterator->next();
290 }
291 };
292 return ResultSet(generator, skip);
293 } else {
294 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
295 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](
296 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
297 if (resultSetPtr->next()) {
298 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
299 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
300 if (initialQuery) {
301 // We're not interested in removals during the initial query
302 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
303 // In the initial set every entity is new
304 callback(domainObject, Sink::Operation_Creation);
305 }
306 } else {
307 // Always remove removals, they probably don't match due to non-available properties
308 if ((operation == Sink::Operation_Removal) || filter(domainObject)) {
309 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
310 callback(domainObject, operation);
311 }
312 }
313 });
314 return true;
315 }
316 return false;
317 };
318 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
319 return ResultSet(generator, skip);
320 }
321}
322 173
323template <class DomainType> 174template <class DomainType>
324QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) 175QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
325{ 176{
326 QTime time; 177 QTime time;
327 time.start(); 178 time.start();
328 179
329 auto db = Storage::mainDatabase(mTransaction, ApplicationDomain::getTypeName<DomainType>()); 180 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction);
181 auto resultSet = preparedQuery.execute();
330 182
331 QSet<QByteArray> remainingFilters;
332 QByteArray remainingSorting;
333 auto resultSet = baseSetRetriever(remainingFilters, remainingSorting);
334 SinkTrace() << "Base set retrieved. " << Log::TraceTime(time.elapsed());
335 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting);
336 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 183 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
337 auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); 184 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback);
338 // SinkTrace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed());
339 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
340}
341 185
342template <class DomainType>
343QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
344{
345 QTime time;
346 time.start();
347 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
348 return loadInitialResultSet(query, remainingFilters, remainingSorting);
349 }, true, offset, batchsize, callback);
350 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 186 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
351 return revisionAndReplayedEntities; 187 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
352} 188}
353 189
354template <class DomainType> 190template <class DomainType>
@@ -357,33 +193,15 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si
357 QTime time; 193 QTime time;
358 time.start(); 194 time.start();
359 const qint64 baseRevision = lastRevision + 1; 195 const qint64 baseRevision = lastRevision + 1;
360 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
361 return loadIncrementalResultSet(baseRevision, query, remainingFilters);
362 }, false, 0, 0, callback);
363 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
364 return revisionAndReplayedEntities;
365}
366 196
367template <class DomainType> 197 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction);
368std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> 198 auto resultSet = preparedQuery.update(baseRevision);
369EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) 199
370{ 200 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
371 return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { 201 auto replayedEntities = replaySet(resultSet, 0, 0, callback);
372 if (!query.ids.isEmpty()) { 202
373 if (!query.ids.contains(domainObject->identifier())) { 203 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
374 return false; 204 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
375 }
376 }
377 for (const auto &filterProperty : remainingFilters) {
378 const auto property = domainObject->getProperty(filterProperty);
379 const auto comparator = query.propertyFilter.value(filterProperty);
380 if (!comparator.matches(property)) {
381 SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
382 return false;
383 }
384 }
385 return true;
386 };
387} 205}
388 206
389template <class DomainType> 207template <class DomainType>
@@ -394,9 +212,11 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int
394 int counter = 0; 212 int counter = 0;
395 while (!batchSize || (counter < batchSize)) { 213 while (!batchSize || (counter < batchSize)) {
396 const bool ret = 214 const bool ret =
397 resultSet.next([&counter, callback](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { 215 resultSet.next([this, &counter, callback](const QByteArray &uid, const Sink::EntityBuffer &value, Sink::Operation operation) -> bool {
398 counter++; 216 counter++;
399 return callback(value.staticCast<DomainType>(), operation); 217 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(value.entity());
218 Q_ASSERT(adaptor);
219 return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, uid, value.revision(), adaptor), operation);
400 }); 220 });
401 if (!ret) { 221 if (!ret) {
402 break; 222 break;
diff --git a/common/entityreader.h b/common/entityreader.h
index a479679..f216453 100644
--- a/common/entityreader.h
+++ b/common/entityreader.h
@@ -89,17 +89,6 @@ public:
89private: 89private:
90 qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); 90 qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback);
91 91
92 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key,
93 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback);
94
95 ResultSet loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting);
96 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters);
97
98 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
99 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty);
100 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query);
101 QPair<qint64, qint64> load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const ResultCallback &callback);
102
103private: 92private:
104 QByteArray mResourceInstanceIdentifier; 93 QByteArray mResourceInstanceIdentifier;
105 Sink::Storage::Transaction &mTransaction; 94 Sink::Storage::Transaction &mTransaction;
diff --git a/common/resultset.cpp b/common/resultset.cpp
index 293035b..51914e9 100644
--- a/common/resultset.cpp
+++ b/common/resultset.cpp
@@ -18,7 +18,7 @@
18 */ 18 */
19#include "resultset.h" 19#include "resultset.h"
20 20
21#include "common/log.h" 21#include "log.h"
22 22
23ResultSet::ResultSet() : mIt(nullptr) 23ResultSet::ResultSet() : mIt(nullptr)
24{ 24{
@@ -78,12 +78,12 @@ bool ResultSet::next()
78 return true; 78 return true;
79 } 79 }
80 } else { 80 } else {
81 next([](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation) { return false; }); 81 next([](const QByteArray &, const Sink::EntityBuffer &, Sink::Operation) { return false; });
82 } 82 }
83 return false; 83 return false;
84} 84}
85 85
86bool ResultSet::next(std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation)> callback) 86bool ResultSet::next(const Callback &callback)
87{ 87{
88 Q_ASSERT(mValueGenerator); 88 Q_ASSERT(mValueGenerator);
89 return mValueGenerator(callback); 89 return mValueGenerator(callback);
diff --git a/common/resultset.h b/common/resultset.h
index 88f7055..4e934fc 100644
--- a/common/resultset.h
+++ b/common/resultset.h
@@ -20,8 +20,8 @@
20 20
21#include <QVector> 21#include <QVector>
22#include <functional> 22#include <functional>
23#include "domain/applicationdomaintype.h"
24#include "metadata_generated.h" 23#include "metadata_generated.h"
24#include "entitybuffer.h"
25 25
26/* 26/*
27 * An iterator to a result set. 27 * An iterator to a result set.
@@ -31,7 +31,8 @@
31class ResultSet 31class ResultSet
32{ 32{
33public: 33public:
34 typedef std::function<bool(std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)>)> ValueGenerator; 34 typedef std::function<void(const QByteArray &uid, const Sink::EntityBuffer &, Sink::Operation)> Callback;
35 typedef std::function<bool(Callback)> ValueGenerator;
35 typedef std::function<QByteArray()> IdGenerator; 36 typedef std::function<QByteArray()> IdGenerator;
36 typedef std::function<void()> SkipValue; 37 typedef std::function<void()> SkipValue;
37 38
@@ -42,7 +43,7 @@ public:
42 ResultSet(const ResultSet &other); 43 ResultSet(const ResultSet &other);
43 44
44 bool next(); 45 bool next();
45 bool next(std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation)> callback); 46 bool next(const Callback &callback);
46 47
47 void skip(int number); 48 void skip(int number);
48 49