summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-11 13:06:27 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-11 15:26:32 +0100
commit4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b (patch)
tree08d80bbe1cc890f949110a760cba18e8f2b5249b
parente5bec3abfe2f2463244d65386dbd1088bf56f5f3 (diff)
downloadsink-4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b.tar.gz
sink-4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b.zip
Prepared new query based synchronization API
-rw-r--r--common/query.h15
-rw-r--r--common/remoteidmap.cpp9
-rw-r--r--common/remoteidmap.h2
-rw-r--r--common/synchronizer.cpp52
-rw-r--r--common/synchronizer.h32
-rw-r--r--examples/imapresource/imapresource.cpp121
-rw-r--r--examples/imapresource/imapserverproxy.h16
-rw-r--r--examples/maildirresource/maildirresource.cpp54
8 files changed, 258 insertions, 43 deletions
diff --git a/common/query.h b/common/query.h
index 925b014..aa2d643 100644
--- a/common/query.h
+++ b/common/query.h
@@ -55,6 +55,9 @@ public:
55 bool operator==(const Filter &other) const; 55 bool operator==(const Filter &other) const;
56 }; 56 };
57 57
58 QueryBase() = default;
59 QueryBase(const QByteArray &type) : mType(type) {}
60
58 bool operator==(const QueryBase &other) const; 61 bool operator==(const QueryBase &other) const;
59 62
60 Comparator getFilter(const QByteArray &property) const 63 Comparator getFilter(const QByteArray &property) const
@@ -62,11 +65,23 @@ public:
62 return mBaseFilterStage.propertyFilter.value(property); 65 return mBaseFilterStage.propertyFilter.value(property);
63 } 66 }
64 67
68 template <class T>
69 Comparator getFilter() const
70 {
71 return getFilter(T::name);
72 }
73
65 bool hasFilter(const QByteArray &property) const 74 bool hasFilter(const QByteArray &property) const
66 { 75 {
67 return mBaseFilterStage.propertyFilter.contains(property); 76 return mBaseFilterStage.propertyFilter.contains(property);
68 } 77 }
69 78
79 template <class T>
80 bool hasFilter() const
81 {
82 return hasFilter(T::name);
83 }
84
70 void setBaseFilters(const QHash<QByteArray, Comparator> &filter) 85 void setBaseFilters(const QHash<QByteArray, Comparator> &filter)
71 { 86 {
72 mBaseFilterStage.propertyFilter = filter; 87 mBaseFilterStage.propertyFilter = filter;
diff --git a/common/remoteidmap.cpp b/common/remoteidmap.cpp
index da57cf6..a16473d 100644
--- a/common/remoteidmap.cpp
+++ b/common/remoteidmap.cpp
@@ -75,6 +75,15 @@ QByteArray RemoteIdMap::resolveLocalId(const QByteArray &bufferType, const QByte
75 return remoteId; 75 return remoteId;
76} 76}
77 77
78QByteArrayList RemoteIdMap::resolveLocalIds(const QByteArray &bufferType, const QByteArrayList &localIds)
79{
80 QByteArrayList result;
81 for (const auto &l : localIds) {
82 result << resolveLocalId(bufferType, l);
83 }
84 return result;
85}
86
78QByteArray RemoteIdMap::readValue(const QByteArray &key) 87QByteArray RemoteIdMap::readValue(const QByteArray &key)
79{ 88{
80 QByteArray value; 89 QByteArray value;
diff --git a/common/remoteidmap.h b/common/remoteidmap.h
index 32c5efd..52e05d7 100644
--- a/common/remoteidmap.h
+++ b/common/remoteidmap.h
@@ -22,6 +22,7 @@
22#include "sink_export.h" 22#include "sink_export.h"
23 23
24#include "storage.h" 24#include "storage.h"
25#include <QByteArrayList>
25 26
26namespace Sink { 27namespace Sink {
27 28
@@ -53,6 +54,7 @@ public:
53 * This can fail if the entity hasn't been written back to the server yet. 54 * This can fail if the entity hasn't been written back to the server yet.
54 */ 55 */
55 QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); 56 QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId);
57 QByteArrayList resolveLocalIds(const QByteArray &bufferType, const QByteArrayList &localId);
56 58
57 QByteArray readValue(const QByteArray &key); 59 QByteArray readValue(const QByteArray &key);
58 void writeValue(const QByteArray &key, const QByteArray &value); 60 void writeValue(const QByteArray &key, const QByteArray &value);
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 85c68e4..713387e 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -222,21 +222,71 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
222 } 222 }
223} 223}
224 224
225QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter)
226{
227 QByteArrayList result;
228 if (filter.value.canConvert<QByteArray>()) {
229 result << filter.value.value<QByteArray>();
230 } else if (filter.value.canConvert<QueryBase>()) {
231 auto query = filter.value.value<QueryBase>();
232 Storage::EntityStore store{mResourceContext};
233 DataStoreQuery dataStoreQuery{query, query.type(), store};
234 auto resultSet = dataStoreQuery.execute();
235 resultSet.replaySet(0, 0, [this, &result](const ResultSet::Result &r) {
236 result << r.entity.identifier();
237 });
238 } else {
239 SinkWarning() << "unknown filter type: " << filter;
240 Q_ASSERT(false);
241 }
242 return result;
243}
244
225template<typename DomainType> 245template<typename DomainType>
226void Synchronizer::modify(const DomainType &entity) 246void Synchronizer::modify(const DomainType &entity)
227{ 247{
228 modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity); 248 modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity);
229} 249}
230 250
251QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query)
252{
253 QList<Synchronizer::SyncRequest> list;
254 list << Synchronizer::SyncRequest{query};
255 return list;
256}
257
231KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) 258KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query)
232{ 259{
233 SinkTrace() << "Synchronizing"; 260 SinkTrace() << "Synchronizing";
261 mSyncRequestQueue << getSyncRequests(query);
262 return processSyncQueue();
263}
264
265KAsync::Job<void> Synchronizer::processSyncQueue()
266{
267 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
268 return KAsync::null<void>();
269 }
234 mSyncInProgress = true; 270 mSyncInProgress = true;
235 mMessageQueue->startTransaction(); 271 mMessageQueue->startTransaction();
236 return synchronizeWithSource(query).syncThen<void>([this]() { 272
273 auto job = KAsync::null<void>();
274 while (!mSyncRequestQueue.isEmpty()) {
275 auto request = mSyncRequestQueue.takeFirst();
276 job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] {
277 //Commit after every request, so implementations only have to commit more if they add a lot of data.
278 commit();
279 });
280 }
281 return job.then<void>([this](const KAsync::Error &error) {
237 mSyncStore.clear(); 282 mSyncStore.clear();
238 mMessageQueue->commit(); 283 mMessageQueue->commit();
239 mSyncInProgress = false; 284 mSyncInProgress = false;
285 if (error) {
286 SinkWarning() << "Error during sync: " << error.errorMessage;
287 return KAsync::error(error);
288 }
289 return KAsync::null<void>();
240 }); 290 });
241} 291}
242 292
diff --git a/common/synchronizer.h b/common/synchronizer.h
index c03c425..47518ee 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -90,11 +90,42 @@ protected:
90 void modify(const DomainType &entity); 90 void modify(const DomainType &entity);
91 // template <typename DomainType> 91 // template <typename DomainType>
92 // void remove(const DomainType &entity); 92 // void remove(const DomainType &entity);
93 QByteArrayList resolveFilter(const QueryBase::Comparator &filter);
93 94
94 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0; 95 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0;
95 96
97 struct SyncRequest {
98 SyncRequest(const Sink::QueryBase &q)
99 : flushQueue(false),
100 query(q)
101 {
102 }
103
104 bool flushQueue;
105 Sink::QueryBase query;
106 };
107
108 /**
109 * This allows the synchronizer to turn a single query into multiple synchronization requests.
110 *
111 * The idea is the following;
112 * The input query is a specification by the application of what data needs to be made available.
113 * Requests could be:
114 * * Give me everything (signified by the default constructed/empty query)
115 * * Give me all mails of folder X
116 * * Give me all mails of folders matching some constraints
117 *
118 * getSyncRequests allows the resource implementation to apply it's own defaults to that request;
119 * * While a maildir resource might give you always all emails of a folder, an IMAP resource might have a date limit, to i.e. only retrieve the last 14 days worth of data.
120 * * A resource get's to define what "give me everything" means. For email that may be turned into first a requests for folders, and then a request for all emails in those folders.
121 *
122 * This will allow synchronizeWithSource to focus on just getting to the content.
123 */
124 virtual QList<Synchronizer::SyncRequest> getSyncRequests(const Sink::QueryBase &query);
125
96private: 126private:
97 void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); 127 void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
128 KAsync::Job<void> processSyncQueue();
98 129
99 Sink::ResourceContext mResourceContext; 130 Sink::ResourceContext mResourceContext;
100 Sink::Storage::EntityStore::Ptr mEntityStore; 131 Sink::Storage::EntityStore::Ptr mEntityStore;
@@ -102,6 +133,7 @@ private:
102 Sink::Storage::DataStore mSyncStorage; 133 Sink::Storage::DataStore mSyncStorage;
103 Sink::Storage::DataStore::Transaction mSyncTransaction; 134 Sink::Storage::DataStore::Transaction mSyncTransaction;
104 std::function<void(int commandId, const QByteArray &data)> mEnqueue; 135 std::function<void(int commandId, const QByteArray &data)> mEnqueue;
136 QList<SyncRequest> mSyncRequestQueue;
105 MessageQueue *mMessageQueue; 137 MessageQueue *mMessageQueue;
106 bool mSyncInProgress; 138 bool mSyncInProgress;
107}; 139};
diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp
index d5a59b9..302d681 100644
--- a/examples/imapresource/imapresource.cpp
+++ b/examples/imapresource/imapresource.cpp
@@ -272,41 +272,102 @@ public:
272 272
273 } 273 }
274 274
275 KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE 275 QList<Synchronizer::SyncRequest> getSyncRequests(const Sink::QueryBase &query) Q_DECL_OVERRIDE
276 { 276 {
277 SinkLog() << " Synchronizing"; 277 QList<Synchronizer::SyncRequest> list;
278 return KAsync::start<void>([this]() { 278 if (query.type() == ApplicationDomain::getTypeName<ApplicationDomain::Mail>()) {
279 SinkTrace() << "Connecting to:" << mServer << mPort; 279 list << Synchronizer::SyncRequest{query};
280 SinkTrace() << "as:" << mUser; 280 } else if (query.type() == ApplicationDomain::getTypeName<ApplicationDomain::Folder>()) {
281 auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort); 281 list << Synchronizer::SyncRequest{query};
282 } else {
283 list << Synchronizer::SyncRequest{Sink::QueryBase(ApplicationDomain::getTypeName<ApplicationDomain::Folder>())};
284 list << Synchronizer::SyncRequest{Sink::QueryBase(ApplicationDomain::getTypeName<ApplicationDomain::Mail>())};
285 }
286 return list;
287 }
282 288
283 return imap->login(mUser, mPassword) 289 KAsync::Job<void> login(QSharedPointer<ImapServerProxy> imap)
284 .addToContext(imap) 290 {
285 .onError([](const KAsync::Error &error) { 291 SinkTrace() << "Connecting to:" << mServer << mPort;
286 SinkWarning() << "Login failed."; 292 SinkTrace() << "as:" << mUser;
287 }) 293 return imap->login(mUser, mPassword)
288 .then<QVector<Folder>>([this, imap]() { 294 .addToContext(imap)
289 auto folderList = QSharedPointer<QVector<Folder>>::create(); 295 .onError([](const KAsync::Error &error) {
290 SinkLog() << "Login was successful"; 296 SinkWarning() << "Login failed.";
291 return imap->fetchFolders([folderList](const Folder &folder) { 297 });
292 *folderList << folder; 298 }
293 }) 299
294 .onError([](const KAsync::Error &error) { 300 KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE
295 SinkWarning() << "Folder list sync failed."; 301 {
302 if (query.type() == ApplicationDomain::getTypeName<ApplicationDomain::Folder>()) {
303 return KAsync::start<void>([this]() {
304 auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort);
305 auto job = login(imap);
306 job = job.then<QVector<Folder>>([this, imap]() {
307 auto folderList = QSharedPointer<QVector<Folder>>::create();
308 return imap->fetchFolders([folderList](const Folder &folder) {
309 *folderList << folder;
310 })
311 .onError([](const KAsync::Error &error) {
312 SinkWarning() << "Folder list sync failed.";
313 })
314 .syncThen<QVector<Folder>>([this, folderList]() {
315 synchronizeFolders(*folderList);
316 commit();
317 return *folderList;
318 });
319 });
320 return job;
321 });
322 } else if (query.type() == ApplicationDomain::getTypeName<ApplicationDomain::Mail>()) {
323 //TODO
324 //if we have a folder filter:
325 //* execute the folder query and resolve the results to the remote identifier
326 //* query only those folders
327 //if we have a date filter:
328 //* apply the date filter to the fetch
329 //if we have no folder filter:
330 //* fetch list of folders from server directly and sync (because we have no guarantee that the folder sync was already processed by the pipeline).
331 return KAsync::start<void>([this, query]() {
332 auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort);
333 auto job = login(imap);
334 job = job.then<QVector<Folder>>([this, imap, query]() {
335 SinkLog() << "Login was successful";
336 //FIXME If we were able to to flush in between we could just query the local store for the folder list.
337 //
338 if (query.hasFilter<ApplicationDomain::Mail::Folder>()) {
339 QVector<Folder> folders;
340 auto folderFilter = query.getFilter<ApplicationDomain::Mail::Folder>();
341 auto localIds = resolveFilter(folderFilter);
342 auto folderRemoteIds = syncStore().resolveLocalIds(ApplicationDomain::getTypeName<ApplicationDomain::Folder>(), localIds);
343 for (const auto &r : folderRemoteIds) {
344 folders << Folder{r};
345 }
346 return KAsync::value(folders);
347 } else {
348 auto folderList = QSharedPointer<QVector<Folder>>::create();
349 return imap->fetchFolders([folderList](const Folder &folder) {
350 *folderList << folder;
351 })
352 .onError([](const KAsync::Error &error) {
353 SinkWarning() << "Folder list sync failed.";
354 })
355 .syncThen<QVector<Folder>>([this, folderList]() {
356 return *folderList;
357 });
358 }
296 }) 359 })
297 .syncThen<QVector<Folder>>([this, folderList]() { 360 .serialEach<void>([this, imap](const Folder &folder) {
298 synchronizeFolders(*folderList); 361 if (folder.noselect) {
299 commit(); 362 return KAsync::null<void>();
300 return *folderList; 363 }
364 return synchronizeFolder(imap, folder);
301 }); 365 });
302 }) 366
303 .serialEach<void>([this, imap](const Folder &folder) { 367 return job;
304 if (folder.noselect) {
305 return KAsync::null<void>();
306 }
307 return synchronizeFolder(imap, folder);
308 }); 368 });
309 }); 369 }
370 return KAsync::error<void>("Nothing to do");
310 } 371 }
311 372
312public: 373public:
diff --git a/examples/imapresource/imapserverproxy.h b/examples/imapresource/imapserverproxy.h
index 8b39f23..6e6626e 100644
--- a/examples/imapresource/imapserverproxy.h
+++ b/examples/imapresource/imapserverproxy.h
@@ -58,6 +58,20 @@ struct Message {
58}; 58};
59 59
60struct Folder { 60struct Folder {
61 Folder() = default;
62 Folder(QList<QString> pathParts_, const QString &path_, const QChar &separator_, bool noselect_)
63 : pathParts(pathParts_),
64 path(path_),
65 separator(separator_),
66 noselect(noselect_)
67 {
68 }
69
70 Folder(const QString &path_)
71 : path(path_)
72 {
73 }
74
61 QString normalizedPath() const 75 QString normalizedPath() const
62 { 76 {
63 return pathParts.join('/'); 77 return pathParts.join('/');
@@ -73,7 +87,7 @@ struct Folder {
73 QList<QString> pathParts; 87 QList<QString> pathParts;
74 QString path; 88 QString path;
75 QChar separator; 89 QChar separator;
76 bool noselect; 90 bool noselect = false;
77}; 91};
78 92
79struct SelectResult { 93struct SelectResult {
diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp
index 820ec2f..fc77315 100644
--- a/examples/maildirresource/maildirresource.cpp
+++ b/examples/maildirresource/maildirresource.cpp
@@ -330,25 +330,57 @@ public:
330 SinkLog() << "Synchronized " << count << " mails in " << listingPath << Sink::Log::TraceTime(elapsed) << " " << elapsed/qMax(count, 1) << " [ms/mail]"; 330 SinkLog() << "Synchronized " << count << " mails in " << listingPath << Sink::Log::TraceTime(elapsed) << " " << elapsed/qMax(count, 1) << " [ms/mail]";
331 } 331 }
332 332
333 QList<Synchronizer::SyncRequest> getSyncRequests(const Sink::QueryBase &query) Q_DECL_OVERRIDE
334 {
335 QList<Synchronizer::SyncRequest> list;
336 if (!query.type().isEmpty()) {
337 //We want to synchronize something specific
338 list << Synchronizer::SyncRequest{query};
339 } else {
340 //We want to synchronize everything
341 list << Synchronizer::SyncRequest{Sink::QueryBase(ApplicationDomain::getTypeName<ApplicationDomain::Folder>())};
342 //FIXME we can't process the second synchronization before the pipeline of the first one is processed, otherwise we can't execute a query on the local data.
343 /* list << Synchronizer::SyncRequest{Flush}; */
344 list << Synchronizer::SyncRequest{Sink::QueryBase(ApplicationDomain::getTypeName<ApplicationDomain::Mail>())};
345 }
346 return list;
347 }
348
333 KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE 349 KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE
334 { 350 {
335 SinkLog() << " Synchronizing"; 351 auto job = KAsync::start<void>([this] {
336 return KAsync::start<void>([this]() {
337 KPIM::Maildir maildir(mMaildirPath, true); 352 KPIM::Maildir maildir(mMaildirPath, true);
338 if (!maildir.isValid(false)) { 353 if (!maildir.isValid(false)) {
339 return KAsync::error<void>(1, "Maildir path doesn't point to a valid maildir: " + mMaildirPath); 354 return KAsync::error<void>(1, "Maildir path doesn't point to a valid maildir: " + mMaildirPath);
340 } 355 }
341 synchronizeFolders();
342 //The next sync needs the folders available
343 commit();
344 for (const auto &folder : listAvailableFolders()) {
345 synchronizeMails(folder);
346 //Don't let the transaction grow too much
347 commit();
348 }
349 SinkLog() << "Done Synchronizing";
350 return KAsync::null<void>(); 356 return KAsync::null<void>();
351 }); 357 });
358
359 if (query.type() == ApplicationDomain::getTypeName<ApplicationDomain::Folder>()) {
360 job = job.syncThen<void>([this] {
361 synchronizeFolders();
362 });
363 } else if (query.type() == ApplicationDomain::getTypeName<ApplicationDomain::Mail>()) {
364 job = job.syncThen<void>([this, query] {
365 QStringList folders;
366 if (query.hasFilter<ApplicationDomain::Mail::Folder>()) {
367 auto folderFilter = query.getFilter<ApplicationDomain::Mail::Folder>();
368 auto localIds = resolveFilter(folderFilter);
369 auto folderRemoteIds = syncStore().resolveLocalIds(ApplicationDomain::getTypeName<ApplicationDomain::Folder>(), localIds);
370 for (const auto &r : folderRemoteIds) {
371 folders << r;
372 }
373 } else {
374 folders = listAvailableFolders();
375 }
376 for (const auto &folder : folders) {
377 synchronizeMails(folder);
378 //Don't let the transaction grow too much
379 commit();
380 }
381 });
382 }
383 return job;
352 } 384 }
353 385
354public: 386public: