summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/commandprocessor.cpp1
-rw-r--r--common/synchronizer.cpp77
-rw-r--r--common/synchronizer.h11
3 files changed, 70 insertions, 19 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index a6371be..5d5261f 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -331,6 +331,7 @@ KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size)
331 mSynchronizer->flush(flushType, flushId); 331 mSynchronizer->flush(flushType, flushId);
332 } else { 332 } else {
333 SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; 333 SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId;
334 mSynchronizer->flushComplete(flushId);
334 Sink::Notification n; 335 Sink::Notification n;
335 n.type = Sink::Notification::FlushCompletion; 336 n.type = Sink::Notification::FlushCompletion;
336 n.id = flushId; 337 n.id = flushId;
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index d94083b..3863cc4 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -274,12 +274,32 @@ void Synchronizer::flush(int commandId, const QByteArray &flushId)
274 processSyncQueue().exec(); 274 processSyncQueue().exec();
275} 275}
276 276
277void Synchronizer::flushComplete(const QByteArray &flushId)
278{
279 SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId;
280 if (mPendingSyncRequests.contains(flushId)) {
281 const auto requests = mPendingSyncRequests.values(flushId);
282 for (const auto &r : requests) {
283 //We want to process the pending request before any others in the queue
284 mSyncRequestQueue.prepend(r);
285 }
286 mPendingSyncRequests.remove(flushId);
287 processSyncQueue().exec();
288 }
289}
290
277KAsync::Job<void> Synchronizer::processSyncQueue() 291KAsync::Job<void> Synchronizer::processSyncQueue()
278{ 292{
279 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { 293 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
280 SinkTrace() << "Sync still in progress or nothing to do."; 294 SinkTrace() << "Sync still in progress or nothing to do.";
281 return KAsync::null<void>(); 295 return KAsync::null<void>();
282 } 296 }
297 //Don't process any new requests until we're done with the pending ones.
298 //Otherwise we might process a flush before the previous request actually completed.
299 if (!mPendingSyncRequests.isEmpty()) {
300 SinkTrace() << "We still have pending sync requests. Not executing next request.";
301 return KAsync::null<void>();
302 }
283 303
284 auto job = KAsync::syncStart<void>([this] { 304 auto job = KAsync::syncStart<void>([this] {
285 mMessageQueue->startTransaction(); 305 mMessageQueue->startTransaction();
@@ -287,7 +307,26 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
287 }); 307 });
288 while (!mSyncRequestQueue.isEmpty()) { 308 while (!mSyncRequestQueue.isEmpty()) {
289 const auto request = mSyncRequestQueue.takeFirst(); 309 const auto request = mSyncRequestQueue.takeFirst();
290 if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 310 if (request.options & SyncRequest::RequestFlush) {
311 job = job.then([=] {
312 //Trigger a flush and record original request without flush option
313 auto modifiedRequest = request;
314 modifiedRequest.options = SyncRequest::NoOptions;
315 //Normally we won't have a requestId here
316 if (modifiedRequest.requestId.isEmpty()) {
317 modifiedRequest.requestId = QUuid::createUuid().toRfc4122();
318 }
319
320 //The sync request will be executed once the flush has completed
321 mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest);
322
323 flatbuffers::FlatBufferBuilder fbb;
324 auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString());
325 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
326 Sink::Commands::FinishFlushBuffer(fbb, location);
327 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
328 });
329 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
291 job = job.then([this, request] { 330 job = job.then([this, request] {
292 Sink::Notification n; 331 Sink::Notification n;
293 n.id = request.requestId; 332 n.id = request.requestId;
@@ -295,14 +334,14 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
295 n.message = "Synchronization has started."; 334 n.message = "Synchronization has started.";
296 n.code = ApplicationDomain::BusyStatus; 335 n.code = ApplicationDomain::BusyStatus;
297 emit notify(n); 336 emit notify(n);
298 SinkLogCtx(mLogCtx) << "Synchronizing " << request.query; 337 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query;
299 }).then(synchronizeWithSource(request.query)).then([this] { 338 }).then(synchronizeWithSource(request.query)).then([this] {
300 //Commit after every request, so implementations only have to commit more if they add a lot of data. 339 //Commit after every request, so implementations only have to commit more if they add a lot of data.
301 commit(); 340 commit();
302 }).then<void>([this, request](const KAsync::Error &error) { 341 }).then<void>([this, request](const KAsync::Error &error) {
303 if (error) { 342 if (error) {
304 //Emit notification with error 343 //Emit notification with error
305 SinkWarning() << "Synchronization failed: " << error.errorMessage; 344 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage;
306 Sink::Notification n; 345 Sink::Notification n;
307 n.id = request.requestId; 346 n.id = request.requestId;
308 n.type = Notification::Status; 347 n.type = Notification::Status;
@@ -311,7 +350,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
311 emit notify(n); 350 emit notify(n);
312 return KAsync::error(error); 351 return KAsync::error(error);
313 } else { 352 } else {
314 SinkLog() << "Done Synchronizing"; 353 SinkLogCtx(mLogCtx) << "Done Synchronizing";
315 Sink::Notification n; 354 Sink::Notification n;
316 n.id = request.requestId; 355 n.id = request.requestId;
317 n.type = Notification::Status; 356 n.type = Notification::Status;
@@ -322,20 +361,22 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
322 } 361 }
323 }); 362 });
324 } else if (request.requestType == Synchronizer::SyncRequest::Flush) { 363 } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
325 Q_ASSERT(!request.requestId.isEmpty()); 364 job = job.then([=] {
326 if (request.flushType == Flush::FlushReplayQueue) { 365 Q_ASSERT(!request.requestId.isEmpty());
327 SinkTrace() << "Emitting flush completion."; 366 if (request.flushType == Flush::FlushReplayQueue) {
328 Sink::Notification n; 367 SinkTrace() << "Emitting flush completion.";
329 n.type = Sink::Notification::FlushCompletion; 368 Sink::Notification n;
330 n.id = request.requestId; 369 n.type = Sink::Notification::FlushCompletion;
331 emit notify(n); 370 n.id = request.requestId;
332 } else { 371 emit notify(n);
333 flatbuffers::FlatBufferBuilder fbb; 372 } else {
334 auto flushId = fbb.CreateString(request.requestId.toStdString()); 373 flatbuffers::FlatBufferBuilder fbb;
335 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); 374 auto flushId = fbb.CreateString(request.requestId.toStdString());
336 Sink::Commands::FinishFlushBuffer(fbb, location); 375 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
337 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); 376 Sink::Commands::FinishFlushBuffer(fbb, location);
338 } 377 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
378 }
379 });
339 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { 380 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
340 job = job.then(replayNextRevision()); 381 job = job.then(replayNextRevision());
341 } else { 382 } else {
diff --git a/common/synchronizer.h b/common/synchronizer.h
index a0a432c..be90293 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -57,6 +57,7 @@ public:
57 Sink::Storage::DataStore::Transaction &syncTransaction(); 57 Sink::Storage::DataStore::Transaction &syncTransaction();
58 58
59 bool allChangesReplayed() Q_DECL_OVERRIDE; 59 bool allChangesReplayed() Q_DECL_OVERRIDE;
60 void flushComplete(const QByteArray &flushId);
60 61
61signals: 62signals:
62 void notify(Notification); 63 void notify(Notification);
@@ -123,9 +124,15 @@ protected:
123 Flush 124 Flush
124 }; 125 };
125 126
126 SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray()) 127 enum RequestOptions {
128 NoOptions,
129 RequestFlush
130 };
131
132 SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray(), RequestOptions o = NoOptions)
127 : requestId(requestId_), 133 : requestId(requestId_),
128 requestType(Synchronization), 134 requestType(Synchronization),
135 options(o),
129 query(q) 136 query(q)
130 { 137 {
131 } 138 }
@@ -145,6 +152,7 @@ protected:
145 int flushType = 0; 152 int flushType = 0;
146 QByteArray requestId; 153 QByteArray requestId;
147 RequestType requestType; 154 RequestType requestType;
155 RequestOptions options = NoOptions;
148 Sink::QueryBase query; 156 Sink::QueryBase query;
149 }; 157 };
150 158
@@ -181,6 +189,7 @@ private:
181 QList<SyncRequest> mSyncRequestQueue; 189 QList<SyncRequest> mSyncRequestQueue;
182 MessageQueue *mMessageQueue; 190 MessageQueue *mMessageQueue;
183 bool mSyncInProgress; 191 bool mSyncInProgress;
192 QMultiHash<QByteArray, SyncRequest> mPendingSyncRequests;
184}; 193};
185 194
186} 195}