summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-11 13:06:27 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-11 15:26:32 +0100
commit4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b (patch)
tree08d80bbe1cc890f949110a760cba18e8f2b5249b /common/synchronizer.cpp
parente5bec3abfe2f2463244d65386dbd1088bf56f5f3 (diff)
downloadsink-4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b.tar.gz
sink-4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b.zip
Prepared new query based synchronization API
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp52
1 files changed, 51 insertions, 1 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 85c68e4..713387e 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -222,21 +222,71 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
222 } 222 }
223} 223}
224 224
225QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter)
226{
227 QByteArrayList result;
228 if (filter.value.canConvert<QByteArray>()) {
229 result << filter.value.value<QByteArray>();
230 } else if (filter.value.canConvert<QueryBase>()) {
231 auto query = filter.value.value<QueryBase>();
232 Storage::EntityStore store{mResourceContext};
233 DataStoreQuery dataStoreQuery{query, query.type(), store};
234 auto resultSet = dataStoreQuery.execute();
235 resultSet.replaySet(0, 0, [this, &result](const ResultSet::Result &r) {
236 result << r.entity.identifier();
237 });
238 } else {
239 SinkWarning() << "unknown filter type: " << filter;
240 Q_ASSERT(false);
241 }
242 return result;
243}
244
225template<typename DomainType> 245template<typename DomainType>
226void Synchronizer::modify(const DomainType &entity) 246void Synchronizer::modify(const DomainType &entity)
227{ 247{
228 modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity); 248 modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity);
229} 249}
230 250
251QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query)
252{
253 QList<Synchronizer::SyncRequest> list;
254 list << Synchronizer::SyncRequest{query};
255 return list;
256}
257
231KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) 258KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query)
232{ 259{
233 SinkTrace() << "Synchronizing"; 260 SinkTrace() << "Synchronizing";
261 mSyncRequestQueue << getSyncRequests(query);
262 return processSyncQueue();
263}
264
265KAsync::Job<void> Synchronizer::processSyncQueue()
266{
267 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
268 return KAsync::null<void>();
269 }
234 mSyncInProgress = true; 270 mSyncInProgress = true;
235 mMessageQueue->startTransaction(); 271 mMessageQueue->startTransaction();
236 return synchronizeWithSource(query).syncThen<void>([this]() { 272
273 auto job = KAsync::null<void>();
274 while (!mSyncRequestQueue.isEmpty()) {
275 auto request = mSyncRequestQueue.takeFirst();
276 job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] {
277 //Commit after every request, so implementations only have to commit more if they add a lot of data.
278 commit();
279 });
280 }
281 return job.then<void>([this](const KAsync::Error &error) {
237 mSyncStore.clear(); 282 mSyncStore.clear();
238 mMessageQueue->commit(); 283 mMessageQueue->commit();
239 mSyncInProgress = false; 284 mSyncInProgress = false;
285 if (error) {
286 SinkWarning() << "Error during sync: " << error.errorMessage;
287 return KAsync::error(error);
288 }
289 return KAsync::null<void>();
240 }); 290 });
241} 291}
242 292