From 4e6b3ce7d1ce97c3e1fb9ae53c5b2be1787acc6b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 11 Nov 2016 13:06:27 +0100 Subject: Prepared new query based synchronization API --- common/synchronizer.cpp | 52 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) (limited to 'common/synchronizer.cpp') diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 85c68e4..713387e 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -222,21 +222,71 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray } } +QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter) +{ + QByteArrayList result; + if (filter.value.canConvert()) { + result << filter.value.value(); + } else if (filter.value.canConvert()) { + auto query = filter.value.value(); + Storage::EntityStore store{mResourceContext}; + DataStoreQuery dataStoreQuery{query, query.type(), store}; + auto resultSet = dataStoreQuery.execute(); + resultSet.replaySet(0, 0, [this, &result](const ResultSet::Result &r) { + result << r.entity.identifier(); + }); + } else { + SinkWarning() << "unknown filter type: " << filter; + Q_ASSERT(false); + } + return result; +} + template void Synchronizer::modify(const DomainType &entity) { modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName(), entity); } +QList Synchronizer::getSyncRequests(const Sink::QueryBase &query) +{ + QList list; + list << Synchronizer::SyncRequest{query}; + return list; +} + KAsync::Job Synchronizer::synchronize(const Sink::QueryBase &query) { SinkTrace() << "Synchronizing"; + mSyncRequestQueue << getSyncRequests(query); + return processSyncQueue(); +} + +KAsync::Job Synchronizer::processSyncQueue() +{ + if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { + return KAsync::null(); + } mSyncInProgress = true; mMessageQueue->startTransaction(); - return synchronizeWithSource(query).syncThen([this]() { + + auto job = KAsync::null(); + while (!mSyncRequestQueue.isEmpty()) { + auto request = mSyncRequestQueue.takeFirst(); + job = job.then(synchronizeWithSource(request.query)).syncThen([this] { + //Commit after every request, so implementations only have to commit more if they add a lot of data. + commit(); + }); + } + return job.then([this](const KAsync::Error &error) { mSyncStore.clear(); mMessageQueue->commit(); mSyncInProgress = false; + if (error) { + SinkWarning() << "Error during sync: " << error.errorMessage; + return KAsync::error(error); + } + return KAsync::null(); }); } -- cgit v1.2.3