summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-19 18:55:21 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-19 18:55:21 +0200
commit4a14a6fade947aa830d3f21598a4a6ba7316b933 (patch)
treec6b340bf1c6284e5501d371f65b58e3a69391a26 /common
parent1deac558af4b1c9f04352ede7f8e172f11a70a6b (diff)
downloadsink-4a14a6fade947aa830d3f21598a4a6ba7316b933.tar.gz
sink-4a14a6fade947aa830d3f21598a4a6ba7316b933.zip
Refactored the query part of the entity reader into DataStoreQuery.
DataStoreQuery now encapsulates the low-level query that operates directly on the storage. It no longer has access to the resource buffers, and is instantiated by the type implementation, so we can specialize the query alogorithm per type, but not per resource. This will allow us to implement the threading queries for the mailtype.
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/datastorequery.cpp246
-rw-r--r--common/datastorequery.h59
-rw-r--r--common/domain/event.cpp13
-rw-r--r--common/domain/event.h2
-rw-r--r--common/domain/folder.cpp11
-rw-r--r--common/domain/folder.h2
-rw-r--r--common/domain/mail.cpp13
-rw-r--r--common/domain/mail.h2
-rw-r--r--common/entitybuffer.cpp14
-rw-r--r--common/entitybuffer.h6
-rw-r--r--common/entityreader.cpp250
-rw-r--r--common/entityreader.h11
-rw-r--r--common/resultset.cpp6
-rw-r--r--common/resultset.h7
15 files changed, 409 insertions, 234 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 5eb15ba..0fc8d81 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -75,6 +75,7 @@ set(command_SRCS
75 entityreader.cpp 75 entityreader.cpp
76 mailpreprocessor.cpp 76 mailpreprocessor.cpp
77 specialpurposepreprocessor.cpp 77 specialpurposepreprocessor.cpp
78 datastorequery.cpp
78 ${storage_SRCS}) 79 ${storage_SRCS})
79 80
80add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 81add_library(${PROJECT_NAME} SHARED ${command_SRCS})
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
new file mode 100644
index 0000000..3237c53
--- /dev/null
+++ b/common/datastorequery.cpp
@@ -0,0 +1,246 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19#include "datastorequery.h"
20
21#include "log.h"
22#include "entitybuffer.h"
23#include "entity_generated.h"
24
25using namespace Sink;
26
27
28SINK_DEBUG_AREA("datastorequery")
29
30DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty)
31 : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty)
32{
33
34}
35
36static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
37{
38 // TODO use a result set with an iterator, to read values on demand
39 SinkTrace() << "Looking for : " << bufferType;
40 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
41 QSet<QByteArray> keys;
42 Storage::mainDatabase(transaction, bufferType)
43 .scan(QByteArray(),
44 [&](const QByteArray &key, const QByteArray &value) -> bool {
45 if (keys.contains(Sink::Storage::uidFromKey(key))) {
46 //Not something that should persist if the replay works, so we keep a message for now.
47 SinkTrace() << "Multiple revisions for key: " << key;
48 }
49 keys << Sink::Storage::uidFromKey(key);
50 return true;
51 },
52 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
53
54 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
55 return ResultSet(keys.toList().toVector());
56}
57
58ResultSet DataStoreQuery::loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
59{
60 if (!mQuery.ids.isEmpty()) {
61 return ResultSet(mQuery.ids.toVector());
62 }
63 QSet<QByteArray> appliedFilters;
64 QByteArray appliedSorting;
65
66 auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction);
67
68 remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters;
69 if (appliedSorting.isEmpty()) {
70 remainingSorting = mQuery.sortProperty;
71 }
72
73 // We do a full scan if there were no indexes available to create the initial set.
74 if (appliedFilters.isEmpty()) {
75 // TODO this should be replaced by an index lookup as well
76 resultSet = fullScan(mTransaction, mType);
77 }
78 return resultSet;
79}
80
81ResultSet DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters)
82{
83 const auto bufferType = mType;
84 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
85 remainingFilters = mQuery.propertyFilter.keys().toSet();
86 return ResultSet([this, bufferType, revisionCounter]() -> QByteArray {
87 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction);
88 // Spit out the revision keys one by one.
89 while (*revisionCounter <= topRevision) {
90 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
91 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
92 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
93 Q_ASSERT(!uid.isEmpty());
94 Q_ASSERT(!type.isEmpty());
95 if (type != bufferType) {
96 // Skip revision
97 *revisionCounter += 1;
98 continue;
99 }
100 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
101 *revisionCounter += 1;
102 return key;
103 }
104 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
105 // We're done
106 return QByteArray();
107 });
108}
109
110void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback)
111{
112 mDb.findLatest(key,
113 [=](const QByteArray &key, const QByteArray &value) -> bool {
114 resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size()));
115 return false;
116 },
117 [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; });
118}
119
120QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property)
121{
122 return mGetProperty(entity, property);
123}
124
125ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty)
126{
127 const bool sortingRequired = !sortProperty.isEmpty();
128 if (initialQuery && sortingRequired) {
129 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty;
130 // Sort the complete set by reading the sort property and filling into a sorted map
131 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
132 while (resultSet.next()) {
133 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
134 readEntity(resultSet.id(),
135 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
136
137 const auto operation = buffer.operation();
138
139 // We're not interested in removals during the initial query
140 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
141 if (!sortProperty.isEmpty()) {
142 const auto sortValue = getProperty(buffer.entity(), sortProperty);
143 if (sortValue.type() == QVariant::DateTime) {
144 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid);
145 } else {
146 sortedMap->insert(sortValue.toString().toLatin1(), uid);
147 }
148 } else {
149 sortedMap->insert(uid, uid);
150 }
151 }
152 });
153 }
154
155 SinkTrace() << "Sorted " << sortedMap->size() << " values.";
156 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
157 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter, initialQuery](
158 std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool {
159 if (iterator->hasNext()) {
160 readEntity(iterator->next().value(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
161 callback(uid, buffer, Sink::Operation_Creation);
162 });
163 return true;
164 }
165 return false;
166 };
167
168 auto skip = [iterator]() {
169 if (iterator->hasNext()) {
170 iterator->next();
171 }
172 };
173 return ResultSet(generator, skip);
174 } else {
175 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
176 ResultSet::ValueGenerator generator = [this, resultSetPtr, filter, initialQuery](const ResultSet::Callback &callback) -> bool {
177 if (resultSetPtr->next()) {
178 SinkTrace() << "Reading the next value: " << resultSetPtr->id();
179 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
180 readEntity(resultSetPtr->id(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
181 const auto operation = buffer.operation();
182 if (initialQuery) {
183 // We're not interested in removals during the initial query
184 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
185 // In the initial set every entity is new
186 callback(uid, buffer, Sink::Operation_Creation);
187 }
188 } else {
189 // Always remove removals, they probably don't match due to non-available properties
190 if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) {
191 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
192 callback(uid, buffer, operation);
193 }
194 }
195 });
196 return true;
197 }
198 return false;
199 };
200 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
201 return ResultSet(generator, skip);
202 }
203}
204
205
206DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet<QByteArray> &remainingFilters)
207{
208 auto query = mQuery;
209 return [this, remainingFilters, query](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool {
210 if (!query.ids.isEmpty()) {
211 if (!query.ids.contains(uid)) {
212 SinkTrace() << "Filter by uid: " << uid;
213 return false;
214 }
215 }
216 for (const auto &filterProperty : remainingFilters) {
217 const auto property = getProperty(entity.entity(), filterProperty);
218 const auto comparator = query.propertyFilter.value(filterProperty);
219 if (!comparator.matches(property)) {
220 SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
221 return false;
222 }
223 }
224 return true;
225 };
226}
227
228ResultSet DataStoreQuery::update(qint64 baseRevision)
229{
230 SinkTrace() << "Executing query update";
231 QSet<QByteArray> remainingFilters;
232 QByteArray remainingSorting;
233 auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters);
234 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), false, remainingSorting);
235 return filteredSet;
236}
237
238ResultSet DataStoreQuery::execute()
239{
240 SinkTrace() << "Executing query";
241 QSet<QByteArray> remainingFilters;
242 QByteArray remainingSorting;
243 auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting);
244 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), true, remainingSorting);
245 return filteredSet;
246}
diff --git a/common/datastorequery.h b/common/datastorequery.h
new file mode 100644
index 0000000..cf9d9e2
--- /dev/null
+++ b/common/datastorequery.h
@@ -0,0 +1,59 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19#pragma once
20
21#include "query.h"
22#include "storage.h"
23#include "resultset.h"
24#include "typeindex.h"
25#include "query.h"
26#include "entitybuffer.h"
27
28class DataStoreQuery {
29public:
30 DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty);
31 ResultSet execute();
32 ResultSet update(qint64 baseRevision);
33
34private:
35
36 typedef std::function<bool(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> FilterFunction;
37 typedef std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> BufferCallback;
38
39 QVariant getProperty(const Sink::Entity &entity, const QByteArray &property);
40
41 void readEntity(const QByteArray &key, const BufferCallback &resultCallback);
42
43 ResultSet loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting);
44 ResultSet loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters);
45
46 ResultSet filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty);
47 FilterFunction getFilter(const QSet<QByteArray> &remainingFilters);
48
49 Sink::Query mQuery;
50 Sink::Storage::Transaction &mTransaction;
51 const QByteArray mType;
52 TypeIndex &mTypeIndex;
53 Sink::Storage::NamedDatabase mDb;
54 std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> mGetProperty;
55};
56
57
58
59
diff --git a/common/domain/event.cpp b/common/domain/event.cpp
index 0909bf1..dfbcb61 100644
--- a/common/domain/event.cpp
+++ b/common/domain/event.cpp
@@ -32,6 +32,8 @@
32#include "../query.h" 32#include "../query.h"
33#include "../definitions.h" 33#include "../definitions.h"
34#include "../typeindex.h" 34#include "../typeindex.h"
35#include "entitybuffer.h"
36#include "entity_generated.h"
35 37
36#include "event_generated.h" 38#include "event_generated.h"
37 39
@@ -84,3 +86,14 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Event>::BufferBuilder> > T
84 propertyMapper->addMapping<Event::Attachment>(&BufferBuilder::add_attachment); 86 propertyMapper->addMapping<Event::Attachment>(&BufferBuilder::add_attachment);
85 return propertyMapper; 87 return propertyMapper;
86} 88}
89
90DataStoreQuery TypeImplementation<Event>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction)
91{
92
93 auto mapper = initializeReadPropertyMapper();
94 return DataStoreQuery(query, ApplicationDomain::getTypeName<Event>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) {
95
96 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
97 return mapper->getProperty(property, localBuffer);
98 });
99}
diff --git a/common/domain/event.h b/common/domain/event.h
index 5315566..4ac572c 100644
--- a/common/domain/event.h
+++ b/common/domain/event.h
@@ -21,6 +21,7 @@
21#include "applicationdomaintype.h" 21#include "applicationdomaintype.h"
22 22
23#include "storage.h" 23#include "storage.h"
24#include "datastorequery.h"
24 25
25class ResultSet; 26class ResultSet;
26class QByteArray; 27class QByteArray;
@@ -50,6 +51,7 @@ public:
50 typedef Sink::ApplicationDomain::Buffer::Event Buffer; 51 typedef Sink::ApplicationDomain::Buffer::Event Buffer;
51 typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; 52 typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder;
52 static QSet<QByteArray> indexedProperties(); 53 static QSet<QByteArray> indexedProperties();
54 static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction);
53 /** 55 /**
54 * Returns the potential result set based on the indexes. 56 * Returns the potential result set based on the indexes.
55 * 57 *
diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp
index ddb0c10..6d487b1 100644
--- a/common/domain/folder.cpp
+++ b/common/domain/folder.cpp
@@ -32,6 +32,8 @@
32#include "../query.h" 32#include "../query.h"
33#include "../definitions.h" 33#include "../definitions.h"
34#include "../typeindex.h" 34#include "../typeindex.h"
35#include "entitybuffer.h"
36#include "entity_generated.h"
35 37
36#include "folder_generated.h" 38#include "folder_generated.h"
37 39
@@ -88,3 +90,12 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Folder>::BufferBuilder> >
88 propertyMapper->addMapping<Folder::SpecialPurpose>(&BufferBuilder::add_specialpurpose); 90 propertyMapper->addMapping<Folder::SpecialPurpose>(&BufferBuilder::add_specialpurpose);
89 return propertyMapper; 91 return propertyMapper;
90} 92}
93
94DataStoreQuery TypeImplementation<Folder>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction)
95{
96 auto mapper = initializeReadPropertyMapper();
97 return DataStoreQuery(query, ApplicationDomain::getTypeName<Folder>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) {
98 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
99 return mapper->getProperty(property, localBuffer);
100 });
101}
diff --git a/common/domain/folder.h b/common/domain/folder.h
index 6e066e1..77edc8a 100644
--- a/common/domain/folder.h
+++ b/common/domain/folder.h
@@ -21,6 +21,7 @@
21#include "applicationdomaintype.h" 21#include "applicationdomaintype.h"
22 22
23#include "storage.h" 23#include "storage.h"
24#include "datastorequery.h"
24 25
25class ResultSet; 26class ResultSet;
26class QByteArray; 27class QByteArray;
@@ -44,6 +45,7 @@ class TypeImplementation<Sink::ApplicationDomain::Folder> {
44public: 45public:
45 typedef Sink::ApplicationDomain::Buffer::Folder Buffer; 46 typedef Sink::ApplicationDomain::Buffer::Folder Buffer;
46 typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; 47 typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder;
48 static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction);
47 static QSet<QByteArray> indexedProperties(); 49 static QSet<QByteArray> indexedProperties();
48 static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); 50 static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction);
49 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 51 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction);
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp
index 13e1305..bb5ad58 100644
--- a/common/domain/mail.cpp
+++ b/common/domain/mail.cpp
@@ -32,6 +32,8 @@
32#include "../query.h" 32#include "../query.h"
33#include "../definitions.h" 33#include "../definitions.h"
34#include "../typeindex.h" 34#include "../typeindex.h"
35#include "entitybuffer.h"
36#include "entity_generated.h"
35 37
36#include "mail_generated.h" 38#include "mail_generated.h"
37 39
@@ -110,3 +112,14 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Mail>::BufferBuilder> > Ty
110 propertyMapper->addMapping<Mail::Sent>(&BufferBuilder::add_sent); 112 propertyMapper->addMapping<Mail::Sent>(&BufferBuilder::add_sent);
111 return propertyMapper; 113 return propertyMapper;
112} 114}
115
116DataStoreQuery TypeImplementation<Mail>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction)
117{
118 auto mapper = initializeReadPropertyMapper();
119 return DataStoreQuery(query, ApplicationDomain::getTypeName<Mail>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) {
120
121 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
122 return mapper->getProperty(property, localBuffer);
123 });
124}
125
diff --git a/common/domain/mail.h b/common/domain/mail.h
index ff169dd..d6af9c5 100644
--- a/common/domain/mail.h
+++ b/common/domain/mail.h
@@ -21,6 +21,7 @@
21#include "applicationdomaintype.h" 21#include "applicationdomaintype.h"
22 22
23#include "storage.h" 23#include "storage.h"
24#include "datastorequery.h"
24 25
25class ResultSet; 26class ResultSet;
26class QByteArray; 27class QByteArray;
@@ -44,6 +45,7 @@ class TypeImplementation<Sink::ApplicationDomain::Mail> {
44public: 45public:
45 typedef Sink::ApplicationDomain::Buffer::Mail Buffer; 46 typedef Sink::ApplicationDomain::Buffer::Mail Buffer;
46 typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; 47 typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder;
48 static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction);
47 static QSet<QByteArray> indexedProperties(); 49 static QSet<QByteArray> indexedProperties();
48 /** 50 /**
49 * Returns the potential result set based on the indexes. 51 * Returns the potential result set based on the indexes.
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp
index 950bc46..32583cc 100644
--- a/common/entitybuffer.cpp
+++ b/common/entitybuffer.cpp
@@ -26,7 +26,7 @@ bool EntityBuffer::isValid() const
26 return mEntity; 26 return mEntity;
27} 27}
28 28
29const Sink::Entity &EntityBuffer::entity() 29const Sink::Entity &EntityBuffer::entity() const
30{ 30{
31 Q_ASSERT(mEntity); 31 Q_ASSERT(mEntity);
32 return *mEntity; 32 return *mEntity;
@@ -84,3 +84,15 @@ void EntityBuffer::assembleEntityBuffer(
84 auto entity = Sink::CreateEntity(fbb, metadata, resource, local); 84 auto entity = Sink::CreateEntity(fbb, metadata, resource, local);
85 Sink::FinishEntityBuffer(fbb, entity); 85 Sink::FinishEntityBuffer(fbb, entity);
86} 86}
87
88Sink::Operation EntityBuffer::operation() const
89{
90 const auto metadataBuffer = readBuffer<Sink::Metadata>(mEntity->metadata());
91 return metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
92}
93
94qint64 EntityBuffer::revision() const
95{
96 const auto metadataBuffer = readBuffer<Sink::Metadata>(mEntity->metadata());
97 return metadataBuffer ? metadataBuffer->revision() : -1;
98}
diff --git a/common/entitybuffer.h b/common/entitybuffer.h
index 866a7d0..4162605 100644
--- a/common/entitybuffer.h
+++ b/common/entitybuffer.h
@@ -3,6 +3,7 @@
3#include "sink_export.h" 3#include "sink_export.h"
4#include <functional> 4#include <functional>
5#include <flatbuffers/flatbuffers.h> 5#include <flatbuffers/flatbuffers.h>
6#include "metadata_generated.h"
6#include <QByteArray> 7#include <QByteArray>
7 8
8namespace Sink { 9namespace Sink {
@@ -16,9 +17,12 @@ public:
16 const uint8_t *resourceBuffer(); 17 const uint8_t *resourceBuffer();
17 const uint8_t *metadataBuffer(); 18 const uint8_t *metadataBuffer();
18 const uint8_t *localBuffer(); 19 const uint8_t *localBuffer();
19 const Entity &entity(); 20 const Entity &entity() const;
20 bool isValid() const; 21 bool isValid() const;
21 22
23 Sink::Operation operation() const;
24 qint64 revision() const;
25
22 static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler); 26 static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler);
23 /* 27 /*
24 * TODO: Ideally we would be passing references to vectors in the same bufferbuilder, to avoid needlessly copying data. 28 * TODO: Ideally we would be passing references to vectors in the same bufferbuilder, to avoid needlessly copying data.
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
index 01c25d2..faa154b 100644
--- a/common/entityreader.cpp
+++ b/common/entityreader.cpp
@@ -150,205 +150,41 @@ void EntityReader<DomainType>::query(const Sink::Query &query, const std::functi
150 }); 150 });
151} 151}
152 152
153template <class DomainType> 153/* template <class DomainType> */
154void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, 154/* void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, */
155 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) 155/* const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) */
156{ 156/* { */
157 db.findLatest(key, 157/* db.findLatest(key, */
158 [=](const QByteArray &key, const QByteArray &value) -> bool { 158/* [=](const QByteArray &key, const QByteArray &value) -> bool { */
159 Sink::EntityBuffer buffer(value.data(), value.size()); 159/* Sink::EntityBuffer buffer(value.data(), value.size()); */
160 const Sink::Entity &entity = buffer.entity(); 160/* const Sink::Entity &entity = buffer.entity(); */
161 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 161/* const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); */
162 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; 162/* const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; */
163 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; 163/* const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; */
164 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); 164/* auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); */
165 Q_ASSERT(adaptor); 165/* Q_ASSERT(adaptor); */
166 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); 166/* resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); */
167 return false; 167/* return false; */
168 }, 168/* }, */
169 [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); 169/* [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); */
170} 170/* } */
171
172static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
173{
174 // TODO use a result set with an iterator, to read values on demand
175 SinkTrace() << "Looking for : " << bufferType;
176 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
177 QSet<QByteArray> keys;
178 Storage::mainDatabase(transaction, bufferType)
179 .scan(QByteArray(),
180 [&](const QByteArray &key, const QByteArray &value) -> bool {
181 if (keys.contains(Sink::Storage::uidFromKey(key))) {
182 //Not something that should persist if the replay works, so we keep a message for now.
183 SinkTrace() << "Multiple revisions for key: " << key;
184 }
185 keys << Sink::Storage::uidFromKey(key);
186 return true;
187 },
188 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
189
190 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
191 return ResultSet(keys.toList().toVector());
192}
193
194template <class DomainType>
195ResultSet EntityReader<DomainType>::loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
196{
197 if (!query.ids.isEmpty()) {
198 return ResultSet(query.ids.toVector());
199 }
200 QSet<QByteArray> appliedFilters;
201 QByteArray appliedSorting;
202 auto resultSet = Sink::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, mTransaction);
203 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
204 if (appliedSorting.isEmpty()) {
205 remainingSorting = query.sortProperty;
206 }
207 171
208 // We do a full scan if there were no indexes available to create the initial set.
209 if (appliedFilters.isEmpty()) {
210 // TODO this should be replaced by an index lookup as well
211 resultSet = fullScan(mTransaction, ApplicationDomain::getTypeName<DomainType>());
212 }
213 return resultSet;
214}
215
216template <class DomainType>
217ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters)
218{
219 const auto bufferType = ApplicationDomain::getTypeName<DomainType>();
220 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
221 remainingFilters = query.propertyFilter.keys().toSet();
222 return ResultSet([this, bufferType, revisionCounter]() -> QByteArray {
223 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction);
224 // Spit out the revision keys one by one.
225 while (*revisionCounter <= topRevision) {
226 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
227 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
228 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
229 Q_ASSERT(!uid.isEmpty());
230 Q_ASSERT(!type.isEmpty());
231 if (type != bufferType) {
232 // Skip revision
233 *revisionCounter += 1;
234 continue;
235 }
236 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
237 *revisionCounter += 1;
238 return key;
239 }
240 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
241 // We're done
242 return QByteArray();
243 });
244}
245
246template <class DomainType>
247ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
248 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty)
249{
250 const bool sortingRequired = !sortProperty.isEmpty();
251 if (initialQuery && sortingRequired) {
252 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty;
253 // Sort the complete set by reading the sort property and filling into a sorted map
254 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
255 while (resultSet.next()) {
256 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
257 readEntity(db, resultSet.id(),
258 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
259 // We're not interested in removals during the initial query
260 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
261 if (!sortProperty.isEmpty()) {
262 const auto sortValue = domainObject->getProperty(sortProperty);
263 if (sortValue.type() == QVariant::DateTime) {
264 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier());
265 } else {
266 sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier());
267 }
268 } else {
269 sortedMap->insert(domainObject->identifier(), domainObject->identifier());
270 }
271 }
272 });
273 }
274
275 SinkTrace() << "Sorted " << sortedMap->size() << " values.";
276 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
277 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](
278 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
279 if (iterator->hasNext()) {
280 readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject,
281 Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); });
282 return true;
283 }
284 return false;
285 };
286 172
287 auto skip = [iterator]() {
288 if (iterator->hasNext()) {
289 iterator->next();
290 }
291 };
292 return ResultSet(generator, skip);
293 } else {
294 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
295 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](
296 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
297 if (resultSetPtr->next()) {
298 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
299 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
300 if (initialQuery) {
301 // We're not interested in removals during the initial query
302 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
303 // In the initial set every entity is new
304 callback(domainObject, Sink::Operation_Creation);
305 }
306 } else {
307 // Always remove removals, they probably don't match due to non-available properties
308 if ((operation == Sink::Operation_Removal) || filter(domainObject)) {
309 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
310 callback(domainObject, operation);
311 }
312 }
313 });
314 return true;
315 }
316 return false;
317 };
318 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
319 return ResultSet(generator, skip);
320 }
321}
322 173
323template <class DomainType> 174template <class DomainType>
324QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) 175QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
325{ 176{
326 QTime time; 177 QTime time;
327 time.start(); 178 time.start();
328 179
329 auto db = Storage::mainDatabase(mTransaction, ApplicationDomain::getTypeName<DomainType>()); 180 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction);
181 auto resultSet = preparedQuery.execute();
330 182
331 QSet<QByteArray> remainingFilters;
332 QByteArray remainingSorting;
333 auto resultSet = baseSetRetriever(remainingFilters, remainingSorting);
334 SinkTrace() << "Base set retrieved. " << Log::TraceTime(time.elapsed());
335 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting);
336 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 183 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
337 auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); 184 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback);
338 // SinkTrace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed());
339 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
340}
341 185
342template <class DomainType>
343QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
344{
345 QTime time;
346 time.start();
347 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
348 return loadInitialResultSet(query, remainingFilters, remainingSorting);
349 }, true, offset, batchsize, callback);
350 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 186 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
351 return revisionAndReplayedEntities; 187 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
352} 188}
353 189
354template <class DomainType> 190template <class DomainType>
@@ -357,33 +193,15 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si
357 QTime time; 193 QTime time;
358 time.start(); 194 time.start();
359 const qint64 baseRevision = lastRevision + 1; 195 const qint64 baseRevision = lastRevision + 1;
360 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
361 return loadIncrementalResultSet(baseRevision, query, remainingFilters);
362 }, false, 0, 0, callback);
363 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
364 return revisionAndReplayedEntities;
365}
366 196
367template <class DomainType> 197 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction);
368std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> 198 auto resultSet = preparedQuery.update(baseRevision);
369EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) 199
370{ 200 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
371 return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { 201 auto replayedEntities = replaySet(resultSet, 0, 0, callback);
372 if (!query.ids.isEmpty()) { 202
373 if (!query.ids.contains(domainObject->identifier())) { 203 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
374 return false; 204 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
375 }
376 }
377 for (const auto &filterProperty : remainingFilters) {
378 const auto property = domainObject->getProperty(filterProperty);
379 const auto comparator = query.propertyFilter.value(filterProperty);
380 if (!comparator.matches(property)) {
381 SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
382 return false;
383 }
384 }
385 return true;
386 };
387} 205}
388 206
389template <class DomainType> 207template <class DomainType>
@@ -394,9 +212,11 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int
394 int counter = 0; 212 int counter = 0;
395 while (!batchSize || (counter < batchSize)) { 213 while (!batchSize || (counter < batchSize)) {
396 const bool ret = 214 const bool ret =
397 resultSet.next([&counter, callback](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { 215 resultSet.next([this, &counter, callback](const QByteArray &uid, const Sink::EntityBuffer &value, Sink::Operation operation) -> bool {
398 counter++; 216 counter++;
399 return callback(value.staticCast<DomainType>(), operation); 217 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(value.entity());
218 Q_ASSERT(adaptor);
219 return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, uid, value.revision(), adaptor), operation);
400 }); 220 });
401 if (!ret) { 221 if (!ret) {
402 break; 222 break;
diff --git a/common/entityreader.h b/common/entityreader.h
index a479679..f216453 100644
--- a/common/entityreader.h
+++ b/common/entityreader.h
@@ -89,17 +89,6 @@ public:
89private: 89private:
90 qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); 90 qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback);
91 91
92 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key,
93 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback);
94
95 ResultSet loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting);
96 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters);
97
98 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
99 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty);
100 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query);
101 QPair<qint64, qint64> load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const ResultCallback &callback);
102
103private: 92private:
104 QByteArray mResourceInstanceIdentifier; 93 QByteArray mResourceInstanceIdentifier;
105 Sink::Storage::Transaction &mTransaction; 94 Sink::Storage::Transaction &mTransaction;
diff --git a/common/resultset.cpp b/common/resultset.cpp
index 293035b..51914e9 100644
--- a/common/resultset.cpp
+++ b/common/resultset.cpp
@@ -18,7 +18,7 @@
18 */ 18 */
19#include "resultset.h" 19#include "resultset.h"
20 20
21#include "common/log.h" 21#include "log.h"
22 22
23ResultSet::ResultSet() : mIt(nullptr) 23ResultSet::ResultSet() : mIt(nullptr)
24{ 24{
@@ -78,12 +78,12 @@ bool ResultSet::next()
78 return true; 78 return true;
79 } 79 }
80 } else { 80 } else {
81 next([](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation) { return false; }); 81 next([](const QByteArray &, const Sink::EntityBuffer &, Sink::Operation) { return false; });
82 } 82 }
83 return false; 83 return false;
84} 84}
85 85
86bool ResultSet::next(std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation)> callback) 86bool ResultSet::next(const Callback &callback)
87{ 87{
88 Q_ASSERT(mValueGenerator); 88 Q_ASSERT(mValueGenerator);
89 return mValueGenerator(callback); 89 return mValueGenerator(callback);
diff --git a/common/resultset.h b/common/resultset.h
index 88f7055..4e934fc 100644
--- a/common/resultset.h
+++ b/common/resultset.h
@@ -20,8 +20,8 @@
20 20
21#include <QVector> 21#include <QVector>
22#include <functional> 22#include <functional>
23#include "domain/applicationdomaintype.h"
24#include "metadata_generated.h" 23#include "metadata_generated.h"
24#include "entitybuffer.h"
25 25
26/* 26/*
27 * An iterator to a result set. 27 * An iterator to a result set.
@@ -31,7 +31,8 @@
31class ResultSet 31class ResultSet
32{ 32{
33public: 33public:
34 typedef std::function<bool(std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)>)> ValueGenerator; 34 typedef std::function<void(const QByteArray &uid, const Sink::EntityBuffer &, Sink::Operation)> Callback;
35 typedef std::function<bool(Callback)> ValueGenerator;
35 typedef std::function<QByteArray()> IdGenerator; 36 typedef std::function<QByteArray()> IdGenerator;
36 typedef std::function<void()> SkipValue; 37 typedef std::function<void()> SkipValue;
37 38
@@ -42,7 +43,7 @@ public:
42 ResultSet(const ResultSet &other); 43 ResultSet(const ResultSet &other);
43 44
44 bool next(); 45 bool next();
45 bool next(std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation)> callback); 46 bool next(const Callback &callback);
46 47
47 void skip(int number); 48 void skip(int number);
48 49