From 4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 11 Nov 2016 13:06:27 +0100 Subject: Prepared new query based synchronization API --- common/query.h | 15 ++++++++++++++ common/remoteidmap.cpp | 9 +++++++++ common/remoteidmap.h | 2 ++ common/synchronizer.cpp | 52 ++++++++++++++++++++++++++++++++++++++++++++++++- common/synchronizer.h | 32 ++++++++++++++++++++++++++++++ 5 files changed, 109 insertions(+), 1 deletion(-) (limited to 'common') diff --git a/common/query.h b/common/query.h index 925b014..aa2d643 100644 --- a/common/query.h +++ b/common/query.h @@ -55,6 +55,9 @@ public: bool operator==(const Filter &other) const; }; + QueryBase() = default; + QueryBase(const QByteArray &type) : mType(type) {} + bool operator==(const QueryBase &other) const; Comparator getFilter(const QByteArray &property) const @@ -62,11 +65,23 @@ public: return mBaseFilterStage.propertyFilter.value(property); } + template + Comparator getFilter() const + { + return getFilter(T::name); + } + bool hasFilter(const QByteArray &property) const { return mBaseFilterStage.propertyFilter.contains(property); } + template + bool hasFilter() const + { + return hasFilter(T::name); + } + void setBaseFilters(const QHash &filter) { mBaseFilterStage.propertyFilter = filter; diff --git a/common/remoteidmap.cpp b/common/remoteidmap.cpp index da57cf6..a16473d 100644 --- a/common/remoteidmap.cpp +++ b/common/remoteidmap.cpp @@ -75,6 +75,15 @@ QByteArray RemoteIdMap::resolveLocalId(const QByteArray &bufferType, const QByte return remoteId; } +QByteArrayList RemoteIdMap::resolveLocalIds(const QByteArray &bufferType, const QByteArrayList &localIds) +{ + QByteArrayList result; + for (const auto &l : localIds) { + result << resolveLocalId(bufferType, l); + } + return result; +} + QByteArray RemoteIdMap::readValue(const QByteArray &key) { QByteArray value; diff --git a/common/remoteidmap.h b/common/remoteidmap.h index 32c5efd..52e05d7 100644 --- a/common/remoteidmap.h +++ b/common/remoteidmap.h @@ -22,6 +22,7 @@ #include "sink_export.h" #include "storage.h" +#include namespace Sink { @@ -53,6 +54,7 @@ public: * This can fail if the entity hasn't been written back to the server yet. */ QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); + QByteArrayList resolveLocalIds(const QByteArray &bufferType, const QByteArrayList &localId); QByteArray readValue(const QByteArray &key); void writeValue(const QByteArray &key, const QByteArray &value); diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 85c68e4..713387e 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -222,21 +222,71 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray } } +QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter) +{ + QByteArrayList result; + if (filter.value.canConvert()) { + result << filter.value.value(); + } else if (filter.value.canConvert()) { + auto query = filter.value.value(); + Storage::EntityStore store{mResourceContext}; + DataStoreQuery dataStoreQuery{query, query.type(), store}; + auto resultSet = dataStoreQuery.execute(); + resultSet.replaySet(0, 0, [this, &result](const ResultSet::Result &r) { + result << r.entity.identifier(); + }); + } else { + SinkWarning() << "unknown filter type: " << filter; + Q_ASSERT(false); + } + return result; +} + template void Synchronizer::modify(const DomainType &entity) { modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName(), entity); } +QList Synchronizer::getSyncRequests(const Sink::QueryBase &query) +{ + QList list; + list << Synchronizer::SyncRequest{query}; + return list; +} + KAsync::Job Synchronizer::synchronize(const Sink::QueryBase &query) { SinkTrace() << "Synchronizing"; + mSyncRequestQueue << getSyncRequests(query); + return processSyncQueue(); +} + +KAsync::Job Synchronizer::processSyncQueue() +{ + if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { + return KAsync::null(); + } mSyncInProgress = true; mMessageQueue->startTransaction(); - return synchronizeWithSource(query).syncThen([this]() { + + auto job = KAsync::null(); + while (!mSyncRequestQueue.isEmpty()) { + auto request = mSyncRequestQueue.takeFirst(); + job = job.then(synchronizeWithSource(request.query)).syncThen([this] { + //Commit after every request, so implementations only have to commit more if they add a lot of data. + commit(); + }); + } + return job.then([this](const KAsync::Error &error) { mSyncStore.clear(); mMessageQueue->commit(); mSyncInProgress = false; + if (error) { + SinkWarning() << "Error during sync: " << error.errorMessage; + return KAsync::error(error); + } + return KAsync::null(); }); } diff --git a/common/synchronizer.h b/common/synchronizer.h index c03c425..47518ee 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -90,11 +90,42 @@ protected: void modify(const DomainType &entity); // template // void remove(const DomainType &entity); + QByteArrayList resolveFilter(const QueryBase::Comparator &filter); virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) = 0; + struct SyncRequest { + SyncRequest(const Sink::QueryBase &q) + : flushQueue(false), + query(q) + { + } + + bool flushQueue; + Sink::QueryBase query; + }; + + /** + * This allows the synchronizer to turn a single query into multiple synchronization requests. + * + * The idea is the following; + * The input query is a specification by the application of what data needs to be made available. + * Requests could be: + * * Give me everything (signified by the default constructed/empty query) + * * Give me all mails of folder X + * * Give me all mails of folders matching some constraints + * + * getSyncRequests allows the resource implementation to apply it's own defaults to that request; + * * While a maildir resource might give you always all emails of a folder, an IMAP resource might have a date limit, to i.e. only retrieve the last 14 days worth of data. + * * A resource get's to define what "give me everything" means. For email that may be turned into first a requests for folders, and then a request for all emails in those folders. + * + * This will allow synchronizeWithSource to focus on just getting to the content. + */ + virtual QList getSyncRequests(const Sink::QueryBase &query); + private: void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); + KAsync::Job processSyncQueue(); Sink::ResourceContext mResourceContext; Sink::Storage::EntityStore::Ptr mEntityStore; @@ -102,6 +133,7 @@ private: Sink::Storage::DataStore mSyncStorage; Sink::Storage::DataStore::Transaction mSyncTransaction; std::function mEnqueue; + QList mSyncRequestQueue; MessageQueue *mMessageQueue; bool mSyncInProgress; }; -- cgit v1.2.3