diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-11 13:06:27 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-11 15:26:32 +0100 |
commit | 4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b (patch) | |
tree | 08d80bbe1cc890f949110a760cba18e8f2b5249b /common/synchronizer.cpp | |
parent | e5bec3abfe2f2463244d65386dbd1088bf56f5f3 (diff) | |
download | sink-4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b.tar.gz sink-4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b.zip |
Prepared new query based synchronization API
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 52 |
1 files changed, 51 insertions, 1 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 85c68e4..713387e 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -222,21 +222,71 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
222 | } | 222 | } |
223 | } | 223 | } |
224 | 224 | ||
225 | QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter) | ||
226 | { | ||
227 | QByteArrayList result; | ||
228 | if (filter.value.canConvert<QByteArray>()) { | ||
229 | result << filter.value.value<QByteArray>(); | ||
230 | } else if (filter.value.canConvert<QueryBase>()) { | ||
231 | auto query = filter.value.value<QueryBase>(); | ||
232 | Storage::EntityStore store{mResourceContext}; | ||
233 | DataStoreQuery dataStoreQuery{query, query.type(), store}; | ||
234 | auto resultSet = dataStoreQuery.execute(); | ||
235 | resultSet.replaySet(0, 0, [this, &result](const ResultSet::Result &r) { | ||
236 | result << r.entity.identifier(); | ||
237 | }); | ||
238 | } else { | ||
239 | SinkWarning() << "unknown filter type: " << filter; | ||
240 | Q_ASSERT(false); | ||
241 | } | ||
242 | return result; | ||
243 | } | ||
244 | |||
225 | template<typename DomainType> | 245 | template<typename DomainType> |
226 | void Synchronizer::modify(const DomainType &entity) | 246 | void Synchronizer::modify(const DomainType &entity) |
227 | { | 247 | { |
228 | modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity); | 248 | modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity); |
229 | } | 249 | } |
230 | 250 | ||
251 | QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query) | ||
252 | { | ||
253 | QList<Synchronizer::SyncRequest> list; | ||
254 | list << Synchronizer::SyncRequest{query}; | ||
255 | return list; | ||
256 | } | ||
257 | |||
231 | KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) | 258 | KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) |
232 | { | 259 | { |
233 | SinkTrace() << "Synchronizing"; | 260 | SinkTrace() << "Synchronizing"; |
261 | mSyncRequestQueue << getSyncRequests(query); | ||
262 | return processSyncQueue(); | ||
263 | } | ||
264 | |||
265 | KAsync::Job<void> Synchronizer::processSyncQueue() | ||
266 | { | ||
267 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { | ||
268 | return KAsync::null<void>(); | ||
269 | } | ||
234 | mSyncInProgress = true; | 270 | mSyncInProgress = true; |
235 | mMessageQueue->startTransaction(); | 271 | mMessageQueue->startTransaction(); |
236 | return synchronizeWithSource(query).syncThen<void>([this]() { | 272 | |
273 | auto job = KAsync::null<void>(); | ||
274 | while (!mSyncRequestQueue.isEmpty()) { | ||
275 | auto request = mSyncRequestQueue.takeFirst(); | ||
276 | job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] { | ||
277 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | ||
278 | commit(); | ||
279 | }); | ||
280 | } | ||
281 | return job.then<void>([this](const KAsync::Error &error) { | ||
237 | mSyncStore.clear(); | 282 | mSyncStore.clear(); |
238 | mMessageQueue->commit(); | 283 | mMessageQueue->commit(); |
239 | mSyncInProgress = false; | 284 | mSyncInProgress = false; |
285 | if (error) { | ||
286 | SinkWarning() << "Error during sync: " << error.errorMessage; | ||
287 | return KAsync::error(error); | ||
288 | } | ||
289 | return KAsync::null<void>(); | ||
240 | }); | 290 | }); |
241 | } | 291 | } |
242 | 292 | ||