summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-01-12 11:45:15 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-01-12 11:45:15 +0100
commit7daeec83233c522980d5e477fee82045de57f77d (patch)
tree5e03d11d23c764eb03f27393fcc37c0529405e10 /common
parentaf8baff21529b5bc47725da3e9e00ec81e5b6f1b (diff)
downloadsink-7daeec83233c522980d5e477fee82045de57f77d.tar.gz
sink-7daeec83233c522980d5e477fee82045de57f77d.zip
syncThen is no longer necessary
Diffstat (limited to 'common')
-rw-r--r--common/changereplay.cpp2
-rw-r--r--common/commandprocessor.cpp12
-rw-r--r--common/messagequeue.cpp2
-rw-r--r--common/pipeline.cpp2
-rw-r--r--common/queryrunner.cpp4
-rw-r--r--common/resourcecontrol.cpp4
-rw-r--r--common/store.cpp2
-rw-r--r--common/synchronizer.cpp8
8 files changed, 18 insertions, 18 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 5d5afbb..224fb25 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -151,7 +151,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
151 return KAsync::value(KAsync::Break); 151 return KAsync::value(KAsync::Break);
152 }); 152 });
153 })) 153 }))
154 .syncThen<void>([this, lastReplayedRevision]() { 154 .then([this, lastReplayedRevision]() {
155 recordReplayedRevision(*lastReplayedRevision); 155 recordReplayedRevision(*lastReplayedRevision);
156 mMainStoreTransaction.abort(); 156 mMainStoreTransaction.abort();
157 if (ChangeReplay::allChangesReplayed()) { 157 if (ChangeReplay::allChangesReplayed()) {
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index 7cd4a5f..87a120b 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -169,7 +169,7 @@ void CommandProcessor::process()
169 } 169 }
170 mProcessingLock = true; 170 mProcessingLock = true;
171 auto job = processPipeline() 171 auto job = processPipeline()
172 .syncThen<void>([this]() { 172 .then([this]() {
173 mProcessingLock = false; 173 mProcessingLock = false;
174 if (messagesToProcessAvailable()) { 174 if (messagesToProcessAvailable()) {
175 process(); 175 process();
@@ -193,10 +193,10 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCom
193 case Sink::Commands::InspectionCommand: 193 case Sink::Commands::InspectionCommand:
194 Q_ASSERT(mInspector); 194 Q_ASSERT(mInspector);
195 return mInspector->processCommand(data, size) 195 return mInspector->processCommand(data, size)
196 .syncThen<qint64>([]() { return -1; }); 196 .then(KAsync::value<qint64>(-1));
197 case Sink::Commands::FlushCommand: 197 case Sink::Commands::FlushCommand:
198 return flush(data, size) 198 return flush(data, size)
199 .syncThen<qint64>([]() { return -1; }); 199 .then(KAsync::value<qint64>(-1));
200 default: 200 default:
201 return KAsync::error<qint64>(-1, "Unhandled command"); 201 return KAsync::error<qint64>(-1, "Unhandled command");
202 } 202 }
@@ -234,7 +234,7 @@ KAsync::Job<void> CommandProcessor::processQueue(MessageQueue *queue)
234 [this, time](const QByteArray &data) -> KAsync::Job<void> { 234 [this, time](const QByteArray &data) -> KAsync::Job<void> {
235 time->start(); 235 time->start();
236 return processQueuedCommand(data) 236 return processQueuedCommand(data)
237 .syncThen<void, qint64>([this, time](qint64 createdRevision) { 237 .then([this, time](qint64 createdRevision) {
238 SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); 238 SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
239 }); 239 });
240 }) 240 })
@@ -251,7 +251,7 @@ KAsync::Job<void> CommandProcessor::processQueue(MessageQueue *queue)
251 } 251 }
252 }); 252 });
253 })) 253 }))
254 .syncThen<void>([this](const KAsync::Error &) { mPipeline->commit(); }); 254 .then([this](const KAsync::Error &) { mPipeline->commit(); });
255} 255}
256 256
257KAsync::Job<void> CommandProcessor::processPipeline() 257KAsync::Job<void> CommandProcessor::processPipeline()
@@ -273,7 +273,7 @@ KAsync::Job<void> CommandProcessor::processPipeline()
273 273
274 auto queue = it->next(); 274 auto queue = it->next();
275 return processQueue(queue) 275 return processQueue(queue)
276 .syncThen<KAsync::ControlFlowFlag>([this, time, it]() { 276 .then([this, time, it]() {
277 SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(time->elapsed()); 277 SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(time->elapsed());
278 if (it->hasNext()) { 278 if (it->hasNext()) {
279 return KAsync::Continue; 279 return KAsync::Continue;
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 0fcbf99..6e79d89 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -108,7 +108,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
108 108
109 // Trace() << "Waiting on " << waitCondition.size() << " results"; 109 // Trace() << "Waiting on " << waitCondition.size() << " results";
110 KAsync::waitForCompletion(waitCondition) 110 KAsync::waitForCompletion(waitCondition)
111 .syncThen<void>([this, resultCount, &future]() { 111 .then([this, resultCount, &future]() {
112 processRemovals(); 112 processRemovals();
113 if (*resultCount == 0) { 113 if (*resultCount == 0) {
114 future.setFinished(); 114 future.setFinished();
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 4cb5f21..32f6454 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -279,7 +279,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
279 279
280 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; 280 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource;
281 auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); 281 auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity);
282 job = job.syncThen<void>([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { 282 job = job.then([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) {
283 if (!error) { 283 if (!error) {
284 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; 284 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull";
285 if (isMove) { 285 if (isMove) {
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index f1e21f4..6730894 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -94,7 +94,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
94 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 94 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize);
95 return newRevisionAndReplayedEntities; 95 return newRevisionAndReplayedEntities;
96 }) 96 })
97 .template syncThen<void, ReplayResult>([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { 97 .template then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
98 if (!guardPtr) { 98 if (!guardPtr) {
99 qWarning() << "The parent object is already gone"; 99 qWarning() << "The parent object is already gone";
100 return; 100 return;
@@ -124,7 +124,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
124 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 124 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider);
125 return newRevisionAndReplayedEntities; 125 return newRevisionAndReplayedEntities;
126 }) 126 })
127 .template syncThen<void, ReplayResult>([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { 127 .template then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
128 if (!guardPtr) { 128 if (!guardPtr) {
129 qWarning() << "The parent object is already gone"; 129 qWarning() << "The parent object is already gone";
130 return; 130 return;
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp
index c1fbf06..b24c902 100644
--- a/common/resourcecontrol.cpp
+++ b/common/resourcecontrol.cpp
@@ -64,7 +64,7 @@ KAsync::Job<void> ResourceControl::shutdown(const QByteArray &identifier)
64 future.setFinished(); 64 future.setFinished();
65 } 65 }
66 }); 66 });
67 }).syncThen<void>([time] { 67 }).then([time] {
68 SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed()); 68 SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed());
69 }); 69 });
70 }); 70 });
@@ -77,7 +77,7 @@ KAsync::Job<void> ResourceControl::start(const QByteArray &identifier)
77 time->start(); 77 time->start();
78 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); 78 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier));
79 resourceAccess->open(); 79 resourceAccess->open();
80 return resourceAccess->sendCommand(Sink::Commands::PingCommand).addToContext(resourceAccess).syncThen<void>([time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); }); 80 return resourceAccess->sendCommand(Sink::Commands::PingCommand).addToContext(resourceAccess).then([time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); });
81} 81}
82 82
83KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier) 83KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier)
diff --git a/common/store.cpp b/common/store.cpp
index 38445e5..b4ef2b7 100644
--- a/common/store.cpp
+++ b/common/store.cpp
@@ -263,7 +263,7 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
263 future.setFinished(); 263 future.setFinished();
264 } 264 }
265 }) 265 })
266 .syncThen<void>([time]() { 266 .then([time]() {
267 SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); 267 SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed());
268 }); 268 });
269} 269}
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index a4e64c9..bb1a3f4 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -286,7 +286,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
286 while (!mSyncRequestQueue.isEmpty()) { 286 while (!mSyncRequestQueue.isEmpty()) {
287 auto request = mSyncRequestQueue.takeFirst(); 287 auto request = mSyncRequestQueue.takeFirst();
288 if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 288 if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
289 job = job.syncThen<void>([this, request] { 289 job = job.then([this, request] {
290 Sink::Notification n; 290 Sink::Notification n;
291 n.id = request.requestId; 291 n.id = request.requestId;
292 n.type = Notification::Status; 292 n.type = Notification::Status;
@@ -294,7 +294,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
294 n.code = ApplicationDomain::BusyStatus; 294 n.code = ApplicationDomain::BusyStatus;
295 emit notify(n); 295 emit notify(n);
296 SinkLogCtx(mLogCtx) << "Synchronizing " << request.query; 296 SinkLogCtx(mLogCtx) << "Synchronizing " << request.query;
297 }).then(synchronizeWithSource(request.query)).syncThen<void>([this] { 297 }).then(synchronizeWithSource(request.query)).then([this] {
298 //Commit after every request, so implementations only have to commit more if they add a lot of data. 298 //Commit after every request, so implementations only have to commit more if they add a lot of data.
299 commit(); 299 commit();
300 }).then<void>([this, request](const KAsync::Error &error) { 300 }).then<void>([this, request](const KAsync::Error &error) {
@@ -431,7 +431,7 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
431 job = replay(mail, operation, oldRemoteId, modifiedProperties); 431 job = replay(mail, operation, oldRemoteId, modifiedProperties);
432 } 432 }
433 433
434 return job.syncThen<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { 434 return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) {
435 if (operation == Sink::Operation_Creation) { 435 if (operation == Sink::Operation_Creation) {
436 SinkTrace() << "Replayed creation with remote id: " << remoteId; 436 SinkTrace() << "Replayed creation with remote id: " << remoteId;
437 if (remoteId.isEmpty()) { 437 if (remoteId.isEmpty()) {
@@ -453,7 +453,7 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
453 SinkError() << "Unkown operation" << operation; 453 SinkError() << "Unkown operation" << operation;
454 } 454 }
455 }) 455 })
456 .syncThen<void>([this](const KAsync::Error &error) { 456 .then([this](const KAsync::Error &error) {
457 if (error) { 457 if (error) {
458 SinkWarning() << "Failed to replay change: " << error.errorMessage; 458 SinkWarning() << "Failed to replay change: " << error.errorMessage;
459 } 459 }