diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/changereplay.cpp | 4 | ||||
-rw-r--r-- | common/changereplay.h | 4 | ||||
-rw-r--r-- | common/genericresource.cpp | 20 | ||||
-rw-r--r-- | common/genericresource.h | 2 | ||||
-rw-r--r-- | common/synchronizer.cpp | 22 | ||||
-rw-r--r-- | common/synchronizer.h | 16 |
6 files changed, 41 insertions, 27 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 6e58564..a0796eb 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -71,8 +71,10 @@ void ChangeReplay::recordReplayedRevision(qint64 revision) | |||
71 | 71 | ||
72 | KAsync::Job<void> ChangeReplay::replayNextRevision() | 72 | KAsync::Job<void> ChangeReplay::replayNextRevision() |
73 | { | 73 | { |
74 | Q_ASSERT(!mReplayInProgress); | ||
74 | auto lastReplayedRevision = QSharedPointer<qint64>::create(0); | 75 | auto lastReplayedRevision = QSharedPointer<qint64>::create(0); |
75 | auto topRevision = QSharedPointer<qint64>::create(0); | 76 | auto topRevision = QSharedPointer<qint64>::create(0); |
77 | emit replayingChanges(); | ||
76 | return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { | 78 | return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { |
77 | mReplayInProgress = true; | 79 | mReplayInProgress = true; |
78 | mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { | 80 | mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { |
@@ -157,6 +159,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
157 | emit changesReplayed(); | 159 | emit changesReplayed(); |
158 | } else { | 160 | } else { |
159 | QTimer::singleShot(0, [this]() { | 161 | QTimer::singleShot(0, [this]() { |
162 | mReplayInProgress = false; | ||
160 | replayNextRevision().exec(); | 163 | replayNextRevision().exec(); |
161 | }); | 164 | }); |
162 | } | 165 | } |
@@ -166,7 +169,6 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
166 | void ChangeReplay::revisionChanged() | 169 | void ChangeReplay::revisionChanged() |
167 | { | 170 | { |
168 | if (!mReplayInProgress) { | 171 | if (!mReplayInProgress) { |
169 | emit replayingChanges(); | ||
170 | replayNextRevision().exec(); | 172 | replayNextRevision().exec(); |
171 | } | 173 | } |
172 | } | 174 | } |
diff --git a/common/changereplay.h b/common/changereplay.h index e86c4f2..4894806 100644 --- a/common/changereplay.h +++ b/common/changereplay.h | |||
@@ -49,16 +49,16 @@ signals: | |||
49 | void replayingChanges(); | 49 | void replayingChanges(); |
50 | 50 | ||
51 | public slots: | 51 | public slots: |
52 | void revisionChanged(); | 52 | virtual void revisionChanged(); |
53 | 53 | ||
54 | protected: | 54 | protected: |
55 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; | 55 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; |
56 | virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; | 56 | virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; |
57 | Sink::Storage::DataStore mStorage; | 57 | Sink::Storage::DataStore mStorage; |
58 | KAsync::Job<void> replayNextRevision(); | ||
58 | 59 | ||
59 | private: | 60 | private: |
60 | void recordReplayedRevision(qint64 revision); | 61 | void recordReplayedRevision(qint64 revision); |
61 | KAsync::Job<void> replayNextRevision(); | ||
62 | Sink::Storage::DataStore mChangeReplayStore; | 62 | Sink::Storage::DataStore mChangeReplayStore; |
63 | bool mReplayInProgress; | 63 | bool mReplayInProgress; |
64 | Sink::Storage::DataStore::Transaction mMainStoreTransaction; | 64 | Sink::Storage::DataStore::Transaction mMainStoreTransaction; |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 746fa33..e36b750 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -290,19 +290,6 @@ KAsync::Job<void> GenericResource::inspect( | |||
290 | return KAsync::null<void>(); | 290 | return KAsync::null<void>(); |
291 | } | 291 | } |
292 | 292 | ||
293 | void GenericResource::enableChangeReplay(bool enable) | ||
294 | { | ||
295 | Q_ASSERT(mSynchronizer); | ||
296 | if (enable) { | ||
297 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); | ||
298 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | ||
299 | QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); | ||
300 | } else { | ||
301 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged); | ||
302 | QObject::disconnect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | ||
303 | } | ||
304 | } | ||
305 | |||
306 | void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) | 293 | void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) |
307 | { | 294 | { |
308 | mPipeline->setPreprocessors(type, preprocessors); | 295 | mPipeline->setPreprocessors(type, preprocessors); |
@@ -338,7 +325,9 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync | |||
338 | } | 325 | } |
339 | 326 | ||
340 | mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); | 327 | mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); |
341 | enableChangeReplay(true); | 328 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
329 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | ||
330 | QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); | ||
342 | } | 331 | } |
343 | 332 | ||
344 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) | 333 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) |
@@ -406,11 +395,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase & | |||
406 | emit notify(n); | 395 | emit notify(n); |
407 | 396 | ||
408 | SinkLog() << " Synchronizing"; | 397 | SinkLog() << " Synchronizing"; |
409 | // Changereplay would deadlock otherwise when trying to open the synchronization store | ||
410 | enableChangeReplay(false); | ||
411 | return mSynchronizer->synchronize(query) | 398 | return mSynchronizer->synchronize(query) |
412 | .then<void>([this](const KAsync::Error &error) { | 399 | .then<void>([this](const KAsync::Error &error) { |
413 | enableChangeReplay(true); | ||
414 | if (!error) { | 400 | if (!error) { |
415 | SinkLog() << "Done Synchronizing"; | 401 | SinkLog() << "Done Synchronizing"; |
416 | Sink::Notification n; | 402 | Sink::Notification n; |
diff --git a/common/genericresource.h b/common/genericresource.h index 7e0f5ad..3f92e93 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -62,8 +62,6 @@ private slots: | |||
62 | void updateLowerBoundRevision(); | 62 | void updateLowerBoundRevision(); |
63 | 63 | ||
64 | protected: | 64 | protected: |
65 | void enableChangeReplay(bool); | ||
66 | |||
67 | void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors); | 65 | void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors); |
68 | void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); | 66 | void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); |
69 | 67 | ||
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 10acefc..8010689 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -274,10 +274,14 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
274 | auto job = KAsync::null<void>(); | 274 | auto job = KAsync::null<void>(); |
275 | while (!mSyncRequestQueue.isEmpty()) { | 275 | while (!mSyncRequestQueue.isEmpty()) { |
276 | auto request = mSyncRequestQueue.takeFirst(); | 276 | auto request = mSyncRequestQueue.takeFirst(); |
277 | job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] { | 277 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { |
278 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 278 | job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] { |
279 | commit(); | 279 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
280 | }); | 280 | commit(); |
281 | }); | ||
282 | } else { | ||
283 | job = replayNextRevision(); | ||
284 | } | ||
281 | } | 285 | } |
282 | return job.then<void>([this](const KAsync::Error &error) { | 286 | return job.then<void>([this](const KAsync::Error &error) { |
283 | mSyncStore.clear(); | 287 | mSyncStore.clear(); |
@@ -311,6 +315,12 @@ Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction( | |||
311 | return mSyncTransaction; | 315 | return mSyncTransaction; |
312 | } | 316 | } |
313 | 317 | ||
318 | void Synchronizer::revisionChanged() | ||
319 | { | ||
320 | mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay}; | ||
321 | processSyncQueue().exec(); | ||
322 | } | ||
323 | |||
314 | bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | 324 | bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) |
315 | { | 325 | { |
316 | Sink::EntityBuffer buffer(value); | 326 | Sink::EntityBuffer buffer(value); |
@@ -334,7 +344,6 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray | |||
334 | Q_ASSERT(!mSyncStore); | 344 | Q_ASSERT(!mSyncStore); |
335 | Q_ASSERT(!mSyncTransaction); | 345 | Q_ASSERT(!mSyncTransaction); |
336 | mEntityStore->startTransaction(Storage::DataStore::ReadOnly); | 346 | mEntityStore->startTransaction(Storage::DataStore::ReadOnly); |
337 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::ReadWrite); | ||
338 | 347 | ||
339 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | 348 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; |
340 | const auto uid = Sink::Storage::DataStore::uidFromKey(key); | 349 | const auto uid = Sink::Storage::DataStore::uidFromKey(key); |
@@ -345,6 +354,9 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray | |||
345 | oldRemoteId = syncStore().resolveLocalId(type, uid); | 354 | oldRemoteId = syncStore().resolveLocalId(type, uid); |
346 | if (oldRemoteId.isEmpty()) { | 355 | if (oldRemoteId.isEmpty()) { |
347 | SinkWarning() << "Couldn't find the remote id for: " << type << uid; | 356 | SinkWarning() << "Couldn't find the remote id for: " << type << uid; |
357 | mSyncStore.clear(); | ||
358 | mSyncTransaction.abort(); | ||
359 | mEntityStore->abortTransaction(); | ||
348 | return KAsync::error<void>(1, "Couldn't find the remote id."); | 360 | return KAsync::error<void>(1, "Couldn't find the remote id."); |
349 | } | 361 | } |
350 | } | 362 | } |
diff --git a/common/synchronizer.h b/common/synchronizer.h index 0a51f54..9a71869 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h | |||
@@ -55,6 +55,9 @@ public: | |||
55 | void commit(); | 55 | void commit(); |
56 | Sink::Storage::DataStore::Transaction &syncTransaction(); | 56 | Sink::Storage::DataStore::Transaction &syncTransaction(); |
57 | 57 | ||
58 | public slots: | ||
59 | virtual void revisionChanged() Q_DECL_OVERRIDE; | ||
60 | |||
58 | protected: | 61 | protected: |
59 | ///Base implementation calls the replay$Type calls | 62 | ///Base implementation calls the replay$Type calls |
60 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | 63 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; |
@@ -108,13 +111,26 @@ protected: | |||
108 | virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0; | 111 | virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0; |
109 | 112 | ||
110 | struct SyncRequest { | 113 | struct SyncRequest { |
114 | enum RequestType { | ||
115 | Synchronization, | ||
116 | ChangeReplay | ||
117 | }; | ||
118 | |||
111 | SyncRequest(const Sink::QueryBase &q) | 119 | SyncRequest(const Sink::QueryBase &q) |
112 | : flushQueue(false), | 120 | : flushQueue(false), |
121 | requestType(Synchronization), | ||
113 | query(q) | 122 | query(q) |
114 | { | 123 | { |
115 | } | 124 | } |
116 | 125 | ||
126 | SyncRequest(RequestType type) | ||
127 | : flushQueue(false), | ||
128 | requestType(type) | ||
129 | { | ||
130 | } | ||
131 | |||
117 | bool flushQueue; | 132 | bool flushQueue; |
133 | RequestType requestType; | ||
118 | Sink::QueryBase query; | 134 | Sink::QueryBase query; |
119 | }; | 135 | }; |
120 | 136 | ||