diff options
-rw-r--r-- | common/commandprocessor.cpp | 1 | ||||
-rw-r--r-- | common/synchronizer.cpp | 77 | ||||
-rw-r--r-- | common/synchronizer.h | 11 | ||||
-rw-r--r-- | examples/imapresource/imapresource.cpp | 3 |
4 files changed, 72 insertions, 20 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index a6371be..5d5261f 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp | |||
@@ -331,6 +331,7 @@ KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size) | |||
331 | mSynchronizer->flush(flushType, flushId); | 331 | mSynchronizer->flush(flushType, flushId); |
332 | } else { | 332 | } else { |
333 | SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; | 333 | SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; |
334 | mSynchronizer->flushComplete(flushId); | ||
334 | Sink::Notification n; | 335 | Sink::Notification n; |
335 | n.type = Sink::Notification::FlushCompletion; | 336 | n.type = Sink::Notification::FlushCompletion; |
336 | n.id = flushId; | 337 | n.id = flushId; |
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index d94083b..3863cc4 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -274,12 +274,32 @@ void Synchronizer::flush(int commandId, const QByteArray &flushId) | |||
274 | processSyncQueue().exec(); | 274 | processSyncQueue().exec(); |
275 | } | 275 | } |
276 | 276 | ||
277 | void Synchronizer::flushComplete(const QByteArray &flushId) | ||
278 | { | ||
279 | SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId; | ||
280 | if (mPendingSyncRequests.contains(flushId)) { | ||
281 | const auto requests = mPendingSyncRequests.values(flushId); | ||
282 | for (const auto &r : requests) { | ||
283 | //We want to process the pending request before any others in the queue | ||
284 | mSyncRequestQueue.prepend(r); | ||
285 | } | ||
286 | mPendingSyncRequests.remove(flushId); | ||
287 | processSyncQueue().exec(); | ||
288 | } | ||
289 | } | ||
290 | |||
277 | KAsync::Job<void> Synchronizer::processSyncQueue() | 291 | KAsync::Job<void> Synchronizer::processSyncQueue() |
278 | { | 292 | { |
279 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { | 293 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { |
280 | SinkTrace() << "Sync still in progress or nothing to do."; | 294 | SinkTrace() << "Sync still in progress or nothing to do."; |
281 | return KAsync::null<void>(); | 295 | return KAsync::null<void>(); |
282 | } | 296 | } |
297 | //Don't process any new requests until we're done with the pending ones. | ||
298 | //Otherwise we might process a flush before the previous request actually completed. | ||
299 | if (!mPendingSyncRequests.isEmpty()) { | ||
300 | SinkTrace() << "We still have pending sync requests. Not executing next request."; | ||
301 | return KAsync::null<void>(); | ||
302 | } | ||
283 | 303 | ||
284 | auto job = KAsync::syncStart<void>([this] { | 304 | auto job = KAsync::syncStart<void>([this] { |
285 | mMessageQueue->startTransaction(); | 305 | mMessageQueue->startTransaction(); |
@@ -287,7 +307,26 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
287 | }); | 307 | }); |
288 | while (!mSyncRequestQueue.isEmpty()) { | 308 | while (!mSyncRequestQueue.isEmpty()) { |
289 | const auto request = mSyncRequestQueue.takeFirst(); | 309 | const auto request = mSyncRequestQueue.takeFirst(); |
290 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | 310 | if (request.options & SyncRequest::RequestFlush) { |
311 | job = job.then([=] { | ||
312 | //Trigger a flush and record original request without flush option | ||
313 | auto modifiedRequest = request; | ||
314 | modifiedRequest.options = SyncRequest::NoOptions; | ||
315 | //Normally we won't have a requestId here | ||
316 | if (modifiedRequest.requestId.isEmpty()) { | ||
317 | modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); | ||
318 | } | ||
319 | |||
320 | //The sync request will be executed once the flush has completed | ||
321 | mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); | ||
322 | |||
323 | flatbuffers::FlatBufferBuilder fbb; | ||
324 | auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); | ||
325 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | ||
326 | Sink::Commands::FinishFlushBuffer(fbb, location); | ||
327 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | ||
328 | }); | ||
329 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | ||
291 | job = job.then([this, request] { | 330 | job = job.then([this, request] { |
292 | Sink::Notification n; | 331 | Sink::Notification n; |
293 | n.id = request.requestId; | 332 | n.id = request.requestId; |
@@ -295,14 +334,14 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
295 | n.message = "Synchronization has started."; | 334 | n.message = "Synchronization has started."; |
296 | n.code = ApplicationDomain::BusyStatus; | 335 | n.code = ApplicationDomain::BusyStatus; |
297 | emit notify(n); | 336 | emit notify(n); |
298 | SinkLogCtx(mLogCtx) << "Synchronizing " << request.query; | 337 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; |
299 | }).then(synchronizeWithSource(request.query)).then([this] { | 338 | }).then(synchronizeWithSource(request.query)).then([this] { |
300 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 339 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
301 | commit(); | 340 | commit(); |
302 | }).then<void>([this, request](const KAsync::Error &error) { | 341 | }).then<void>([this, request](const KAsync::Error &error) { |
303 | if (error) { | 342 | if (error) { |
304 | //Emit notification with error | 343 | //Emit notification with error |
305 | SinkWarning() << "Synchronization failed: " << error.errorMessage; | 344 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage; |
306 | Sink::Notification n; | 345 | Sink::Notification n; |
307 | n.id = request.requestId; | 346 | n.id = request.requestId; |
308 | n.type = Notification::Status; | 347 | n.type = Notification::Status; |
@@ -311,7 +350,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
311 | emit notify(n); | 350 | emit notify(n); |
312 | return KAsync::error(error); | 351 | return KAsync::error(error); |
313 | } else { | 352 | } else { |
314 | SinkLog() << "Done Synchronizing"; | 353 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; |
315 | Sink::Notification n; | 354 | Sink::Notification n; |
316 | n.id = request.requestId; | 355 | n.id = request.requestId; |
317 | n.type = Notification::Status; | 356 | n.type = Notification::Status; |
@@ -322,20 +361,22 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
322 | } | 361 | } |
323 | }); | 362 | }); |
324 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { | 363 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { |
325 | Q_ASSERT(!request.requestId.isEmpty()); | 364 | job = job.then([=] { |
326 | if (request.flushType == Flush::FlushReplayQueue) { | 365 | Q_ASSERT(!request.requestId.isEmpty()); |
327 | SinkTrace() << "Emitting flush completion."; | 366 | if (request.flushType == Flush::FlushReplayQueue) { |
328 | Sink::Notification n; | 367 | SinkTrace() << "Emitting flush completion."; |
329 | n.type = Sink::Notification::FlushCompletion; | 368 | Sink::Notification n; |
330 | n.id = request.requestId; | 369 | n.type = Sink::Notification::FlushCompletion; |
331 | emit notify(n); | 370 | n.id = request.requestId; |
332 | } else { | 371 | emit notify(n); |
333 | flatbuffers::FlatBufferBuilder fbb; | 372 | } else { |
334 | auto flushId = fbb.CreateString(request.requestId.toStdString()); | 373 | flatbuffers::FlatBufferBuilder fbb; |
335 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | 374 | auto flushId = fbb.CreateString(request.requestId.toStdString()); |
336 | Sink::Commands::FinishFlushBuffer(fbb, location); | 375 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); |
337 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | 376 | Sink::Commands::FinishFlushBuffer(fbb, location); |
338 | } | 377 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); |
378 | } | ||
379 | }); | ||
339 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | 380 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { |
340 | job = job.then(replayNextRevision()); | 381 | job = job.then(replayNextRevision()); |
341 | } else { | 382 | } else { |
diff --git a/common/synchronizer.h b/common/synchronizer.h index a0a432c..be90293 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h | |||
@@ -57,6 +57,7 @@ public: | |||
57 | Sink::Storage::DataStore::Transaction &syncTransaction(); | 57 | Sink::Storage::DataStore::Transaction &syncTransaction(); |
58 | 58 | ||
59 | bool allChangesReplayed() Q_DECL_OVERRIDE; | 59 | bool allChangesReplayed() Q_DECL_OVERRIDE; |
60 | void flushComplete(const QByteArray &flushId); | ||
60 | 61 | ||
61 | signals: | 62 | signals: |
62 | void notify(Notification); | 63 | void notify(Notification); |
@@ -123,9 +124,15 @@ protected: | |||
123 | Flush | 124 | Flush |
124 | }; | 125 | }; |
125 | 126 | ||
126 | SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray()) | 127 | enum RequestOptions { |
128 | NoOptions, | ||
129 | RequestFlush | ||
130 | }; | ||
131 | |||
132 | SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray(), RequestOptions o = NoOptions) | ||
127 | : requestId(requestId_), | 133 | : requestId(requestId_), |
128 | requestType(Synchronization), | 134 | requestType(Synchronization), |
135 | options(o), | ||
129 | query(q) | 136 | query(q) |
130 | { | 137 | { |
131 | } | 138 | } |
@@ -145,6 +152,7 @@ protected: | |||
145 | int flushType = 0; | 152 | int flushType = 0; |
146 | QByteArray requestId; | 153 | QByteArray requestId; |
147 | RequestType requestType; | 154 | RequestType requestType; |
155 | RequestOptions options = NoOptions; | ||
148 | Sink::QueryBase query; | 156 | Sink::QueryBase query; |
149 | }; | 157 | }; |
150 | 158 | ||
@@ -181,6 +189,7 @@ private: | |||
181 | QList<SyncRequest> mSyncRequestQueue; | 189 | QList<SyncRequest> mSyncRequestQueue; |
182 | MessageQueue *mMessageQueue; | 190 | MessageQueue *mMessageQueue; |
183 | bool mSyncInProgress; | 191 | bool mSyncInProgress; |
192 | QMultiHash<QByteArray, SyncRequest> mPendingSyncRequests; | ||
184 | }; | 193 | }; |
185 | 194 | ||
186 | } | 195 | } |
diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp index 25d905b..06dc340 100644 --- a/examples/imapresource/imapresource.cpp +++ b/examples/imapresource/imapresource.cpp | |||
@@ -386,7 +386,8 @@ public: | |||
386 | list << Synchronizer::SyncRequest{query}; | 386 | list << Synchronizer::SyncRequest{query}; |
387 | } else { | 387 | } else { |
388 | list << Synchronizer::SyncRequest{Sink::QueryBase(ApplicationDomain::getTypeName<ApplicationDomain::Folder>())}; | 388 | list << Synchronizer::SyncRequest{Sink::QueryBase(ApplicationDomain::getTypeName<ApplicationDomain::Folder>())}; |
389 | list << Synchronizer::SyncRequest{applyMailDefaults(Sink::QueryBase(ApplicationDomain::getTypeName<ApplicationDomain::Mail>()))}; | 389 | //This request depends on the previous one so we flush first. |
390 | list << Synchronizer::SyncRequest{applyMailDefaults(Sink::QueryBase(ApplicationDomain::getTypeName<ApplicationDomain::Mail>())), QByteArray{}, Synchronizer::SyncRequest::RequestFlush}; | ||
390 | } | 391 | } |
391 | return list; | 392 | return list; |
392 | } | 393 | } |