summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/changereplay.cpp4
-rw-r--r--common/changereplay.h4
-rw-r--r--common/genericresource.cpp20
-rw-r--r--common/genericresource.h2
-rw-r--r--common/synchronizer.cpp22
-rw-r--r--common/synchronizer.h16
6 files changed, 41 insertions, 27 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 6e58564..a0796eb 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -71,8 +71,10 @@ void ChangeReplay::recordReplayedRevision(qint64 revision)
71 71
72KAsync::Job<void> ChangeReplay::replayNextRevision() 72KAsync::Job<void> ChangeReplay::replayNextRevision()
73{ 73{
74 Q_ASSERT(!mReplayInProgress);
74 auto lastReplayedRevision = QSharedPointer<qint64>::create(0); 75 auto lastReplayedRevision = QSharedPointer<qint64>::create(0);
75 auto topRevision = QSharedPointer<qint64>::create(0); 76 auto topRevision = QSharedPointer<qint64>::create(0);
77 emit replayingChanges();
76 return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { 78 return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() {
77 mReplayInProgress = true; 79 mReplayInProgress = true;
78 mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { 80 mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
@@ -157,6 +159,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
157 emit changesReplayed(); 159 emit changesReplayed();
158 } else { 160 } else {
159 QTimer::singleShot(0, [this]() { 161 QTimer::singleShot(0, [this]() {
162 mReplayInProgress = false;
160 replayNextRevision().exec(); 163 replayNextRevision().exec();
161 }); 164 });
162 } 165 }
@@ -166,7 +169,6 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
166void ChangeReplay::revisionChanged() 169void ChangeReplay::revisionChanged()
167{ 170{
168 if (!mReplayInProgress) { 171 if (!mReplayInProgress) {
169 emit replayingChanges();
170 replayNextRevision().exec(); 172 replayNextRevision().exec();
171 } 173 }
172} 174}
diff --git a/common/changereplay.h b/common/changereplay.h
index e86c4f2..4894806 100644
--- a/common/changereplay.h
+++ b/common/changereplay.h
@@ -49,16 +49,16 @@ signals:
49 void replayingChanges(); 49 void replayingChanges();
50 50
51public slots: 51public slots:
52 void revisionChanged(); 52 virtual void revisionChanged();
53 53
54protected: 54protected:
55 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; 55 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0;
56 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; 56 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0;
57 Sink::Storage::DataStore mStorage; 57 Sink::Storage::DataStore mStorage;
58 KAsync::Job<void> replayNextRevision();
58 59
59private: 60private:
60 void recordReplayedRevision(qint64 revision); 61 void recordReplayedRevision(qint64 revision);
61 KAsync::Job<void> replayNextRevision();
62 Sink::Storage::DataStore mChangeReplayStore; 62 Sink::Storage::DataStore mChangeReplayStore;
63 bool mReplayInProgress; 63 bool mReplayInProgress;
64 Sink::Storage::DataStore::Transaction mMainStoreTransaction; 64 Sink::Storage::DataStore::Transaction mMainStoreTransaction;
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 746fa33..e36b750 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -290,19 +290,6 @@ KAsync::Job<void> GenericResource::inspect(
290 return KAsync::null<void>(); 290 return KAsync::null<void>();
291} 291}
292 292
293void GenericResource::enableChangeReplay(bool enable)
294{
295 Q_ASSERT(mSynchronizer);
296 if (enable) {
297 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
298 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
299 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection);
300 } else {
301 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged);
302 QObject::disconnect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
303 }
304}
305
306void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) 293void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors)
307{ 294{
308 mPipeline->setPreprocessors(type, preprocessors); 295 mPipeline->setPreprocessors(type, preprocessors);
@@ -338,7 +325,9 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync
338 } 325 }
339 326
340 mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); 327 mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision());
341 enableChangeReplay(true); 328 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
329 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
330 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection);
342} 331}
343 332
344void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) 333void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
@@ -406,11 +395,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &
406 emit notify(n); 395 emit notify(n);
407 396
408 SinkLog() << " Synchronizing"; 397 SinkLog() << " Synchronizing";
409 // Changereplay would deadlock otherwise when trying to open the synchronization store
410 enableChangeReplay(false);
411 return mSynchronizer->synchronize(query) 398 return mSynchronizer->synchronize(query)
412 .then<void>([this](const KAsync::Error &error) { 399 .then<void>([this](const KAsync::Error &error) {
413 enableChangeReplay(true);
414 if (!error) { 400 if (!error) {
415 SinkLog() << "Done Synchronizing"; 401 SinkLog() << "Done Synchronizing";
416 Sink::Notification n; 402 Sink::Notification n;
diff --git a/common/genericresource.h b/common/genericresource.h
index 7e0f5ad..3f92e93 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -62,8 +62,6 @@ private slots:
62 void updateLowerBoundRevision(); 62 void updateLowerBoundRevision();
63 63
64protected: 64protected:
65 void enableChangeReplay(bool);
66
67 void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors); 65 void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors);
68 void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); 66 void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer);
69 67
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 10acefc..8010689 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -274,10 +274,14 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
274 auto job = KAsync::null<void>(); 274 auto job = KAsync::null<void>();
275 while (!mSyncRequestQueue.isEmpty()) { 275 while (!mSyncRequestQueue.isEmpty()) {
276 auto request = mSyncRequestQueue.takeFirst(); 276 auto request = mSyncRequestQueue.takeFirst();
277 job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] { 277 if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
278 //Commit after every request, so implementations only have to commit more if they add a lot of data. 278 job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] {
279 commit(); 279 //Commit after every request, so implementations only have to commit more if they add a lot of data.
280 }); 280 commit();
281 });
282 } else {
283 job = replayNextRevision();
284 }
281 } 285 }
282 return job.then<void>([this](const KAsync::Error &error) { 286 return job.then<void>([this](const KAsync::Error &error) {
283 mSyncStore.clear(); 287 mSyncStore.clear();
@@ -311,6 +315,12 @@ Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction(
311 return mSyncTransaction; 315 return mSyncTransaction;
312} 316}
313 317
318void Synchronizer::revisionChanged()
319{
320 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay};
321 processSyncQueue().exec();
322}
323
314bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) 324bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
315{ 325{
316 Sink::EntityBuffer buffer(value); 326 Sink::EntityBuffer buffer(value);
@@ -334,7 +344,6 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
334 Q_ASSERT(!mSyncStore); 344 Q_ASSERT(!mSyncStore);
335 Q_ASSERT(!mSyncTransaction); 345 Q_ASSERT(!mSyncTransaction);
336 mEntityStore->startTransaction(Storage::DataStore::ReadOnly); 346 mEntityStore->startTransaction(Storage::DataStore::ReadOnly);
337 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::ReadWrite);
338 347
339 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; 348 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
340 const auto uid = Sink::Storage::DataStore::uidFromKey(key); 349 const auto uid = Sink::Storage::DataStore::uidFromKey(key);
@@ -345,6 +354,9 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
345 oldRemoteId = syncStore().resolveLocalId(type, uid); 354 oldRemoteId = syncStore().resolveLocalId(type, uid);
346 if (oldRemoteId.isEmpty()) { 355 if (oldRemoteId.isEmpty()) {
347 SinkWarning() << "Couldn't find the remote id for: " << type << uid; 356 SinkWarning() << "Couldn't find the remote id for: " << type << uid;
357 mSyncStore.clear();
358 mSyncTransaction.abort();
359 mEntityStore->abortTransaction();
348 return KAsync::error<void>(1, "Couldn't find the remote id."); 360 return KAsync::error<void>(1, "Couldn't find the remote id.");
349 } 361 }
350 } 362 }
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 0a51f54..9a71869 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -55,6 +55,9 @@ public:
55 void commit(); 55 void commit();
56 Sink::Storage::DataStore::Transaction &syncTransaction(); 56 Sink::Storage::DataStore::Transaction &syncTransaction();
57 57
58public slots:
59 virtual void revisionChanged() Q_DECL_OVERRIDE;
60
58protected: 61protected:
59 ///Base implementation calls the replay$Type calls 62 ///Base implementation calls the replay$Type calls
60 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; 63 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
@@ -108,13 +111,26 @@ protected:
108 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0; 111 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0;
109 112
110 struct SyncRequest { 113 struct SyncRequest {
114 enum RequestType {
115 Synchronization,
116 ChangeReplay
117 };
118
111 SyncRequest(const Sink::QueryBase &q) 119 SyncRequest(const Sink::QueryBase &q)
112 : flushQueue(false), 120 : flushQueue(false),
121 requestType(Synchronization),
113 query(q) 122 query(q)
114 { 123 {
115 } 124 }
116 125
126 SyncRequest(RequestType type)
127 : flushQueue(false),
128 requestType(type)
129 {
130 }
131
117 bool flushQueue; 132 bool flushQueue;
133 RequestType requestType;
118 Sink::QueryBase query; 134 Sink::QueryBase query;
119 }; 135 };
120 136