diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-11 13:06:27 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-11 15:26:32 +0100 |
commit | 4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b (patch) | |
tree | 08d80bbe1cc890f949110a760cba18e8f2b5249b /common | |
parent | e5bec3abfe2f2463244d65386dbd1088bf56f5f3 (diff) | |
download | sink-4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b.tar.gz sink-4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b.zip |
Prepared new query based synchronization API
Diffstat (limited to 'common')
-rw-r--r-- | common/query.h | 15 | ||||
-rw-r--r-- | common/remoteidmap.cpp | 9 | ||||
-rw-r--r-- | common/remoteidmap.h | 2 | ||||
-rw-r--r-- | common/synchronizer.cpp | 52 | ||||
-rw-r--r-- | common/synchronizer.h | 32 |
5 files changed, 109 insertions, 1 deletions
diff --git a/common/query.h b/common/query.h index 925b014..aa2d643 100644 --- a/common/query.h +++ b/common/query.h | |||
@@ -55,6 +55,9 @@ public: | |||
55 | bool operator==(const Filter &other) const; | 55 | bool operator==(const Filter &other) const; |
56 | }; | 56 | }; |
57 | 57 | ||
58 | QueryBase() = default; | ||
59 | QueryBase(const QByteArray &type) : mType(type) {} | ||
60 | |||
58 | bool operator==(const QueryBase &other) const; | 61 | bool operator==(const QueryBase &other) const; |
59 | 62 | ||
60 | Comparator getFilter(const QByteArray &property) const | 63 | Comparator getFilter(const QByteArray &property) const |
@@ -62,11 +65,23 @@ public: | |||
62 | return mBaseFilterStage.propertyFilter.value(property); | 65 | return mBaseFilterStage.propertyFilter.value(property); |
63 | } | 66 | } |
64 | 67 | ||
68 | template <class T> | ||
69 | Comparator getFilter() const | ||
70 | { | ||
71 | return getFilter(T::name); | ||
72 | } | ||
73 | |||
65 | bool hasFilter(const QByteArray &property) const | 74 | bool hasFilter(const QByteArray &property) const |
66 | { | 75 | { |
67 | return mBaseFilterStage.propertyFilter.contains(property); | 76 | return mBaseFilterStage.propertyFilter.contains(property); |
68 | } | 77 | } |
69 | 78 | ||
79 | template <class T> | ||
80 | bool hasFilter() const | ||
81 | { | ||
82 | return hasFilter(T::name); | ||
83 | } | ||
84 | |||
70 | void setBaseFilters(const QHash<QByteArray, Comparator> &filter) | 85 | void setBaseFilters(const QHash<QByteArray, Comparator> &filter) |
71 | { | 86 | { |
72 | mBaseFilterStage.propertyFilter = filter; | 87 | mBaseFilterStage.propertyFilter = filter; |
diff --git a/common/remoteidmap.cpp b/common/remoteidmap.cpp index da57cf6..a16473d 100644 --- a/common/remoteidmap.cpp +++ b/common/remoteidmap.cpp | |||
@@ -75,6 +75,15 @@ QByteArray RemoteIdMap::resolveLocalId(const QByteArray &bufferType, const QByte | |||
75 | return remoteId; | 75 | return remoteId; |
76 | } | 76 | } |
77 | 77 | ||
78 | QByteArrayList RemoteIdMap::resolveLocalIds(const QByteArray &bufferType, const QByteArrayList &localIds) | ||
79 | { | ||
80 | QByteArrayList result; | ||
81 | for (const auto &l : localIds) { | ||
82 | result << resolveLocalId(bufferType, l); | ||
83 | } | ||
84 | return result; | ||
85 | } | ||
86 | |||
78 | QByteArray RemoteIdMap::readValue(const QByteArray &key) | 87 | QByteArray RemoteIdMap::readValue(const QByteArray &key) |
79 | { | 88 | { |
80 | QByteArray value; | 89 | QByteArray value; |
diff --git a/common/remoteidmap.h b/common/remoteidmap.h index 32c5efd..52e05d7 100644 --- a/common/remoteidmap.h +++ b/common/remoteidmap.h | |||
@@ -22,6 +22,7 @@ | |||
22 | #include "sink_export.h" | 22 | #include "sink_export.h" |
23 | 23 | ||
24 | #include "storage.h" | 24 | #include "storage.h" |
25 | #include <QByteArrayList> | ||
25 | 26 | ||
26 | namespace Sink { | 27 | namespace Sink { |
27 | 28 | ||
@@ -53,6 +54,7 @@ public: | |||
53 | * This can fail if the entity hasn't been written back to the server yet. | 54 | * This can fail if the entity hasn't been written back to the server yet. |
54 | */ | 55 | */ |
55 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); | 56 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); |
57 | QByteArrayList resolveLocalIds(const QByteArray &bufferType, const QByteArrayList &localId); | ||
56 | 58 | ||
57 | QByteArray readValue(const QByteArray &key); | 59 | QByteArray readValue(const QByteArray &key); |
58 | void writeValue(const QByteArray &key, const QByteArray &value); | 60 | void writeValue(const QByteArray &key, const QByteArray &value); |
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 85c68e4..713387e 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -222,21 +222,71 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
222 | } | 222 | } |
223 | } | 223 | } |
224 | 224 | ||
225 | QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter) | ||
226 | { | ||
227 | QByteArrayList result; | ||
228 | if (filter.value.canConvert<QByteArray>()) { | ||
229 | result << filter.value.value<QByteArray>(); | ||
230 | } else if (filter.value.canConvert<QueryBase>()) { | ||
231 | auto query = filter.value.value<QueryBase>(); | ||
232 | Storage::EntityStore store{mResourceContext}; | ||
233 | DataStoreQuery dataStoreQuery{query, query.type(), store}; | ||
234 | auto resultSet = dataStoreQuery.execute(); | ||
235 | resultSet.replaySet(0, 0, [this, &result](const ResultSet::Result &r) { | ||
236 | result << r.entity.identifier(); | ||
237 | }); | ||
238 | } else { | ||
239 | SinkWarning() << "unknown filter type: " << filter; | ||
240 | Q_ASSERT(false); | ||
241 | } | ||
242 | return result; | ||
243 | } | ||
244 | |||
225 | template<typename DomainType> | 245 | template<typename DomainType> |
226 | void Synchronizer::modify(const DomainType &entity) | 246 | void Synchronizer::modify(const DomainType &entity) |
227 | { | 247 | { |
228 | modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity); | 248 | modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity); |
229 | } | 249 | } |
230 | 250 | ||
251 | QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query) | ||
252 | { | ||
253 | QList<Synchronizer::SyncRequest> list; | ||
254 | list << Synchronizer::SyncRequest{query}; | ||
255 | return list; | ||
256 | } | ||
257 | |||
231 | KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) | 258 | KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) |
232 | { | 259 | { |
233 | SinkTrace() << "Synchronizing"; | 260 | SinkTrace() << "Synchronizing"; |
261 | mSyncRequestQueue << getSyncRequests(query); | ||
262 | return processSyncQueue(); | ||
263 | } | ||
264 | |||
265 | KAsync::Job<void> Synchronizer::processSyncQueue() | ||
266 | { | ||
267 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { | ||
268 | return KAsync::null<void>(); | ||
269 | } | ||
234 | mSyncInProgress = true; | 270 | mSyncInProgress = true; |
235 | mMessageQueue->startTransaction(); | 271 | mMessageQueue->startTransaction(); |
236 | return synchronizeWithSource(query).syncThen<void>([this]() { | 272 | |
273 | auto job = KAsync::null<void>(); | ||
274 | while (!mSyncRequestQueue.isEmpty()) { | ||
275 | auto request = mSyncRequestQueue.takeFirst(); | ||
276 | job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] { | ||
277 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | ||
278 | commit(); | ||
279 | }); | ||
280 | } | ||
281 | return job.then<void>([this](const KAsync::Error &error) { | ||
237 | mSyncStore.clear(); | 282 | mSyncStore.clear(); |
238 | mMessageQueue->commit(); | 283 | mMessageQueue->commit(); |
239 | mSyncInProgress = false; | 284 | mSyncInProgress = false; |
285 | if (error) { | ||
286 | SinkWarning() << "Error during sync: " << error.errorMessage; | ||
287 | return KAsync::error(error); | ||
288 | } | ||
289 | return KAsync::null<void>(); | ||
240 | }); | 290 | }); |
241 | } | 291 | } |
242 | 292 | ||
diff --git a/common/synchronizer.h b/common/synchronizer.h index c03c425..47518ee 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h | |||
@@ -90,11 +90,42 @@ protected: | |||
90 | void modify(const DomainType &entity); | 90 | void modify(const DomainType &entity); |
91 | // template <typename DomainType> | 91 | // template <typename DomainType> |
92 | // void remove(const DomainType &entity); | 92 | // void remove(const DomainType &entity); |
93 | QByteArrayList resolveFilter(const QueryBase::Comparator &filter); | ||
93 | 94 | ||
94 | virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0; | 95 | virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0; |
95 | 96 | ||
97 | struct SyncRequest { | ||
98 | SyncRequest(const Sink::QueryBase &q) | ||
99 | : flushQueue(false), | ||
100 | query(q) | ||
101 | { | ||
102 | } | ||
103 | |||
104 | bool flushQueue; | ||
105 | Sink::QueryBase query; | ||
106 | }; | ||
107 | |||
108 | /** | ||
109 | * This allows the synchronizer to turn a single query into multiple synchronization requests. | ||
110 | * | ||
111 | * The idea is the following; | ||
112 | * The input query is a specification by the application of what data needs to be made available. | ||
113 | * Requests could be: | ||
114 | * * Give me everything (signified by the default constructed/empty query) | ||
115 | * * Give me all mails of folder X | ||
116 | * * Give me all mails of folders matching some constraints | ||
117 | * | ||
118 | * getSyncRequests allows the resource implementation to apply it's own defaults to that request; | ||
119 | * * While a maildir resource might give you always all emails of a folder, an IMAP resource might have a date limit, to i.e. only retrieve the last 14 days worth of data. | ||
120 | * * A resource get's to define what "give me everything" means. For email that may be turned into first a requests for folders, and then a request for all emails in those folders. | ||
121 | * | ||
122 | * This will allow synchronizeWithSource to focus on just getting to the content. | ||
123 | */ | ||
124 | virtual QList<Synchronizer::SyncRequest> getSyncRequests(const Sink::QueryBase &query); | ||
125 | |||
96 | private: | 126 | private: |
97 | void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); | 127 | void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); |
128 | KAsync::Job<void> processSyncQueue(); | ||
98 | 129 | ||
99 | Sink::ResourceContext mResourceContext; | 130 | Sink::ResourceContext mResourceContext; |
100 | Sink::Storage::EntityStore::Ptr mEntityStore; | 131 | Sink::Storage::EntityStore::Ptr mEntityStore; |
@@ -102,6 +133,7 @@ private: | |||
102 | Sink::Storage::DataStore mSyncStorage; | 133 | Sink::Storage::DataStore mSyncStorage; |
103 | Sink::Storage::DataStore::Transaction mSyncTransaction; | 134 | Sink::Storage::DataStore::Transaction mSyncTransaction; |
104 | std::function<void(int commandId, const QByteArray &data)> mEnqueue; | 135 | std::function<void(int commandId, const QByteArray &data)> mEnqueue; |
136 | QList<SyncRequest> mSyncRequestQueue; | ||
105 | MessageQueue *mMessageQueue; | 137 | MessageQueue *mMessageQueue; |
106 | bool mSyncInProgress; | 138 | bool mSyncInProgress; |
107 | }; | 139 | }; |