summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-11 13:06:27 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-11 15:26:32 +0100
commit4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b (patch)
tree08d80bbe1cc890f949110a760cba18e8f2b5249b /common
parente5bec3abfe2f2463244d65386dbd1088bf56f5f3 (diff)
downloadsink-4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b.tar.gz
sink-4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b.zip
Prepared new query based synchronization API
Diffstat (limited to 'common')
-rw-r--r--common/query.h15
-rw-r--r--common/remoteidmap.cpp9
-rw-r--r--common/remoteidmap.h2
-rw-r--r--common/synchronizer.cpp52
-rw-r--r--common/synchronizer.h32
5 files changed, 109 insertions, 1 deletions
diff --git a/common/query.h b/common/query.h
index 925b014..aa2d643 100644
--- a/common/query.h
+++ b/common/query.h
@@ -55,6 +55,9 @@ public:
55 bool operator==(const Filter &other) const; 55 bool operator==(const Filter &other) const;
56 }; 56 };
57 57
58 QueryBase() = default;
59 QueryBase(const QByteArray &type) : mType(type) {}
60
58 bool operator==(const QueryBase &other) const; 61 bool operator==(const QueryBase &other) const;
59 62
60 Comparator getFilter(const QByteArray &property) const 63 Comparator getFilter(const QByteArray &property) const
@@ -62,11 +65,23 @@ public:
62 return mBaseFilterStage.propertyFilter.value(property); 65 return mBaseFilterStage.propertyFilter.value(property);
63 } 66 }
64 67
68 template <class T>
69 Comparator getFilter() const
70 {
71 return getFilter(T::name);
72 }
73
65 bool hasFilter(const QByteArray &property) const 74 bool hasFilter(const QByteArray &property) const
66 { 75 {
67 return mBaseFilterStage.propertyFilter.contains(property); 76 return mBaseFilterStage.propertyFilter.contains(property);
68 } 77 }
69 78
79 template <class T>
80 bool hasFilter() const
81 {
82 return hasFilter(T::name);
83 }
84
70 void setBaseFilters(const QHash<QByteArray, Comparator> &filter) 85 void setBaseFilters(const QHash<QByteArray, Comparator> &filter)
71 { 86 {
72 mBaseFilterStage.propertyFilter = filter; 87 mBaseFilterStage.propertyFilter = filter;
diff --git a/common/remoteidmap.cpp b/common/remoteidmap.cpp
index da57cf6..a16473d 100644
--- a/common/remoteidmap.cpp
+++ b/common/remoteidmap.cpp
@@ -75,6 +75,15 @@ QByteArray RemoteIdMap::resolveLocalId(const QByteArray &bufferType, const QByte
75 return remoteId; 75 return remoteId;
76} 76}
77 77
78QByteArrayList RemoteIdMap::resolveLocalIds(const QByteArray &bufferType, const QByteArrayList &localIds)
79{
80 QByteArrayList result;
81 for (const auto &l : localIds) {
82 result << resolveLocalId(bufferType, l);
83 }
84 return result;
85}
86
78QByteArray RemoteIdMap::readValue(const QByteArray &key) 87QByteArray RemoteIdMap::readValue(const QByteArray &key)
79{ 88{
80 QByteArray value; 89 QByteArray value;
diff --git a/common/remoteidmap.h b/common/remoteidmap.h
index 32c5efd..52e05d7 100644
--- a/common/remoteidmap.h
+++ b/common/remoteidmap.h
@@ -22,6 +22,7 @@
22#include "sink_export.h" 22#include "sink_export.h"
23 23
24#include "storage.h" 24#include "storage.h"
25#include <QByteArrayList>
25 26
26namespace Sink { 27namespace Sink {
27 28
@@ -53,6 +54,7 @@ public:
53 * This can fail if the entity hasn't been written back to the server yet. 54 * This can fail if the entity hasn't been written back to the server yet.
54 */ 55 */
55 QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); 56 QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId);
57 QByteArrayList resolveLocalIds(const QByteArray &bufferType, const QByteArrayList &localId);
56 58
57 QByteArray readValue(const QByteArray &key); 59 QByteArray readValue(const QByteArray &key);
58 void writeValue(const QByteArray &key, const QByteArray &value); 60 void writeValue(const QByteArray &key, const QByteArray &value);
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 85c68e4..713387e 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -222,21 +222,71 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
222 } 222 }
223} 223}
224 224
225QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter)
226{
227 QByteArrayList result;
228 if (filter.value.canConvert<QByteArray>()) {
229 result << filter.value.value<QByteArray>();
230 } else if (filter.value.canConvert<QueryBase>()) {
231 auto query = filter.value.value<QueryBase>();
232 Storage::EntityStore store{mResourceContext};
233 DataStoreQuery dataStoreQuery{query, query.type(), store};
234 auto resultSet = dataStoreQuery.execute();
235 resultSet.replaySet(0, 0, [this, &result](const ResultSet::Result &r) {
236 result << r.entity.identifier();
237 });
238 } else {
239 SinkWarning() << "unknown filter type: " << filter;
240 Q_ASSERT(false);
241 }
242 return result;
243}
244
225template<typename DomainType> 245template<typename DomainType>
226void Synchronizer::modify(const DomainType &entity) 246void Synchronizer::modify(const DomainType &entity)
227{ 247{
228 modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity); 248 modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity);
229} 249}
230 250
251QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query)
252{
253 QList<Synchronizer::SyncRequest> list;
254 list << Synchronizer::SyncRequest{query};
255 return list;
256}
257
231KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) 258KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query)
232{ 259{
233 SinkTrace() << "Synchronizing"; 260 SinkTrace() << "Synchronizing";
261 mSyncRequestQueue << getSyncRequests(query);
262 return processSyncQueue();
263}
264
265KAsync::Job<void> Synchronizer::processSyncQueue()
266{
267 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
268 return KAsync::null<void>();
269 }
234 mSyncInProgress = true; 270 mSyncInProgress = true;
235 mMessageQueue->startTransaction(); 271 mMessageQueue->startTransaction();
236 return synchronizeWithSource(query).syncThen<void>([this]() { 272
273 auto job = KAsync::null<void>();
274 while (!mSyncRequestQueue.isEmpty()) {
275 auto request = mSyncRequestQueue.takeFirst();
276 job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] {
277 //Commit after every request, so implementations only have to commit more if they add a lot of data.
278 commit();
279 });
280 }
281 return job.then<void>([this](const KAsync::Error &error) {
237 mSyncStore.clear(); 282 mSyncStore.clear();
238 mMessageQueue->commit(); 283 mMessageQueue->commit();
239 mSyncInProgress = false; 284 mSyncInProgress = false;
285 if (error) {
286 SinkWarning() << "Error during sync: " << error.errorMessage;
287 return KAsync::error(error);
288 }
289 return KAsync::null<void>();
240 }); 290 });
241} 291}
242 292
diff --git a/common/synchronizer.h b/common/synchronizer.h
index c03c425..47518ee 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -90,11 +90,42 @@ protected:
90 void modify(const DomainType &entity); 90 void modify(const DomainType &entity);
91 // template <typename DomainType> 91 // template <typename DomainType>
92 // void remove(const DomainType &entity); 92 // void remove(const DomainType &entity);
93 QByteArrayList resolveFilter(const QueryBase::Comparator &filter);
93 94
94 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0; 95 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0;
95 96
97 struct SyncRequest {
98 SyncRequest(const Sink::QueryBase &q)
99 : flushQueue(false),
100 query(q)
101 {
102 }
103
104 bool flushQueue;
105 Sink::QueryBase query;
106 };
107
108 /**
109 * This allows the synchronizer to turn a single query into multiple synchronization requests.
110 *
111 * The idea is the following;
112 * The input query is a specification by the application of what data needs to be made available.
113 * Requests could be:
114 * * Give me everything (signified by the default constructed/empty query)
115 * * Give me all mails of folder X
116 * * Give me all mails of folders matching some constraints
117 *
118 * getSyncRequests allows the resource implementation to apply it's own defaults to that request;
119 * * While a maildir resource might give you always all emails of a folder, an IMAP resource might have a date limit, to i.e. only retrieve the last 14 days worth of data.
120 * * A resource get's to define what "give me everything" means. For email that may be turned into first a requests for folders, and then a request for all emails in those folders.
121 *
122 * This will allow synchronizeWithSource to focus on just getting to the content.
123 */
124 virtual QList<Synchronizer::SyncRequest> getSyncRequests(const Sink::QueryBase &query);
125
96private: 126private:
97 void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); 127 void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
128 KAsync::Job<void> processSyncQueue();
98 129
99 Sink::ResourceContext mResourceContext; 130 Sink::ResourceContext mResourceContext;
100 Sink::Storage::EntityStore::Ptr mEntityStore; 131 Sink::Storage::EntityStore::Ptr mEntityStore;
@@ -102,6 +133,7 @@ private:
102 Sink::Storage::DataStore mSyncStorage; 133 Sink::Storage::DataStore mSyncStorage;
103 Sink::Storage::DataStore::Transaction mSyncTransaction; 134 Sink::Storage::DataStore::Transaction mSyncTransaction;
104 std::function<void(int commandId, const QByteArray &data)> mEnqueue; 135 std::function<void(int commandId, const QByteArray &data)> mEnqueue;
136 QList<SyncRequest> mSyncRequestQueue;
105 MessageQueue *mMessageQueue; 137 MessageQueue *mMessageQueue;
106 bool mSyncInProgress; 138 bool mSyncInProgress;
107}; 139};