diff options
-rw-r--r-- | common/changereplay.cpp | 144 | ||||
-rw-r--r-- | common/changereplay.h | 4 | ||||
-rw-r--r-- | common/sourcewriteback.cpp | 16 | ||||
-rw-r--r-- | common/sourcewriteback.h | 1 |
4 files changed, 107 insertions, 58 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 638a30d..fbd556f 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -59,67 +59,103 @@ bool ChangeReplay::allChangesReplayed() | |||
59 | return (lastReplayedRevision >= topRevision); | 59 | return (lastReplayedRevision >= topRevision); |
60 | } | 60 | } |
61 | 61 | ||
62 | KAsync::Job<void> ChangeReplay::replayNextRevision() | 62 | void ChangeReplay::recordReplayedRevision(qint64 revision) |
63 | { | 63 | { |
64 | mReplayInProgress = true; | 64 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
65 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | ||
66 | SinkWarning() << error.message; | ||
67 | }); | ||
68 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | ||
69 | SinkWarning() << error.message; | 65 | SinkWarning() << error.message; |
70 | }); | 66 | }); |
71 | qint64 lastReplayedRevision = 0; | 67 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); |
72 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | 68 | replayStoreTransaction.commit(); |
73 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | 69 | }; |
74 | lastReplayedRevision = value.toLongLong(); | ||
75 | return false; | ||
76 | }, | ||
77 | [](const Storage::Error &) {}); | ||
78 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | ||
79 | 70 | ||
80 | auto recordReplayedRevision = [this](qint64 revision) { | 71 | KAsync::Job<void> ChangeReplay::replayNextRevision() |
81 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 72 | { |
82 | SinkWarning() << error.message; | 73 | auto lastReplayedRevision = QSharedPointer<qint64>::create(0); |
83 | }); | 74 | auto topRevision = QSharedPointer<qint64>::create(0); |
84 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | 75 | return KAsync::start<void>([this, lastReplayedRevision, topRevision]() { |
85 | replayStoreTransaction.commit(); | 76 | mReplayInProgress = true; |
86 | }; | 77 | mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
87 | 78 | SinkWarning() << error.message; | |
88 | if (lastReplayedRevision < topRevision) { | 79 | }); |
89 | SinkTrace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | 80 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
90 | qint64 revision = lastReplayedRevision + 1; | 81 | SinkWarning() << error.message; |
91 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | 82 | }); |
92 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | 83 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
93 | const auto key = Storage::assembleKey(uid, revision); | 84 | [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { |
94 | KAsync::Job<void> replayJob = KAsync::null<void>(); | 85 | *lastReplayedRevision = value.toLongLong(); |
95 | Storage::mainDatabase(mainStoreTransaction, type) | ||
96 | .scan(key, | ||
97 | [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool { | ||
98 | SinkTrace() << "Replaying " << key; | ||
99 | replayJob = replay(type, key, value); | ||
100 | return false; | 86 | return false; |
101 | }, | 87 | }, |
102 | [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); | 88 | [](const Storage::Error &) {}); |
103 | return replayJob.then<void>([this, revision, recordReplayedRevision]() { | 89 | *topRevision = Storage::maxRevision(mMainStoreTransaction); |
104 | SinkTrace() << "Replayed until " << revision; | 90 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; |
105 | recordReplayedRevision(revision); | 91 | }) |
106 | //replay until we're done | 92 | .then(KAsync::dowhile( |
107 | QTimer::singleShot(0, this, [this]() { | 93 | [this, lastReplayedRevision, topRevision](KAsync::Future<bool> &future) { |
108 | replayNextRevision().exec(); | 94 | if (*lastReplayedRevision >= *topRevision) { |
109 | }); | 95 | future.setValue(false); |
110 | }, | 96 | future.setFinished(); |
111 | [this, revision, recordReplayedRevision](int, QString) { | 97 | return; |
112 | SinkTrace() << "Change replay failed" << revision; | 98 | } |
113 | //We're probably not online or so, so postpone retrying | 99 | |
114 | mReplayInProgress = false; | 100 | qint64 revision = *lastReplayedRevision + 1; |
115 | emit changesReplayed(); | 101 | KAsync::Job<void> replayJob = KAsync::null<void>(); |
102 | while (revision <= *topRevision) { | ||
103 | const auto uid = Storage::getUidFromRevision(mMainStoreTransaction, revision); | ||
104 | const auto type = Storage::getTypeFromRevision(mMainStoreTransaction, revision); | ||
105 | const auto key = Storage::assembleKey(uid, revision); | ||
106 | bool exitLoop = false; | ||
107 | Storage::mainDatabase(mMainStoreTransaction, type) | ||
108 | .scan(key, | ||
109 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { | ||
110 | SinkTrace() << "Replaying " << key; | ||
111 | if (canReplay(type, key, value)) { | ||
112 | replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision]() { | ||
113 | recordReplayedRevision(revision); | ||
114 | *lastReplayedRevision = revision; | ||
115 | }, | ||
116 | [revision](int, QString) { | ||
117 | SinkTrace() << "Change replay failed" << revision; | ||
118 | }); | ||
119 | exitLoop = true; | ||
120 | } else { | ||
121 | *lastReplayedRevision = revision; | ||
122 | } | ||
123 | return false; | ||
124 | }, | ||
125 | [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); | ||
126 | if (exitLoop) { | ||
127 | break; | ||
128 | } | ||
129 | revision++; | ||
130 | } | ||
131 | replayJob.then<void>([this, revision, lastReplayedRevision, topRevision, &future]() { | ||
132 | SinkTrace() << "Replayed until " << revision; | ||
133 | recordReplayedRevision(*lastReplayedRevision); | ||
134 | QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() { | ||
135 | future.setValue((*lastReplayedRevision < *topRevision)); | ||
136 | future.setFinished(); | ||
137 | }); | ||
138 | }, | ||
139 | [this, revision, &future](int, QString) { | ||
140 | SinkTrace() << "Change replay failed" << revision; | ||
141 | //We're probably not online or so, so postpone retrying | ||
142 | future.setValue(false); | ||
143 | future.setFinished(); | ||
144 | }).exec(); | ||
145 | |||
146 | })) | ||
147 | .then<void>([this, lastReplayedRevision]() { | ||
148 | recordReplayedRevision(*lastReplayedRevision); | ||
149 | mMainStoreTransaction.abort(); | ||
150 | if (allChangesReplayed()) { | ||
151 | mReplayInProgress = false; | ||
152 | emit changesReplayed(); | ||
153 | } else { | ||
154 | QTimer::singleShot(0, [this]() { | ||
155 | replayNextRevision().exec(); | ||
156 | }); | ||
157 | } | ||
116 | }); | 158 | }); |
117 | } else { | ||
118 | SinkTrace() << "No changes to replay"; | ||
119 | mReplayInProgress = false; | ||
120 | emit changesReplayed(); | ||
121 | } | ||
122 | return KAsync::null<void>(); | ||
123 | } | 159 | } |
124 | 160 | ||
125 | void ChangeReplay::revisionChanged() | 161 | void ChangeReplay::revisionChanged() |
diff --git a/common/changereplay.h b/common/changereplay.h index 6c1c1db..88d6ce3 100644 --- a/common/changereplay.h +++ b/common/changereplay.h | |||
@@ -52,12 +52,15 @@ public slots: | |||
52 | 52 | ||
53 | protected: | 53 | protected: |
54 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; | 54 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; |
55 | virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; | ||
55 | Sink::Storage mStorage; | 56 | Sink::Storage mStorage; |
56 | 57 | ||
57 | private: | 58 | private: |
59 | void recordReplayedRevision(qint64 revision); | ||
58 | KAsync::Job<void> replayNextRevision(); | 60 | KAsync::Job<void> replayNextRevision(); |
59 | Sink::Storage mChangeReplayStore; | 61 | Sink::Storage mChangeReplayStore; |
60 | bool mReplayInProgress; | 62 | bool mReplayInProgress; |
63 | Sink::Storage::Transaction mMainStoreTransaction; | ||
61 | }; | 64 | }; |
62 | 65 | ||
63 | class NullChangeReplay : public ChangeReplay | 66 | class NullChangeReplay : public ChangeReplay |
@@ -65,6 +68,7 @@ class NullChangeReplay : public ChangeReplay | |||
65 | public: | 68 | public: |
66 | NullChangeReplay(const QByteArray &resourceName) : ChangeReplay(resourceName) {} | 69 | NullChangeReplay(const QByteArray &resourceName) : ChangeReplay(resourceName) {} |
67 | KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null<void>(); } | 70 | KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null<void>(); } |
71 | bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return false; } | ||
68 | }; | 72 | }; |
69 | 73 | ||
70 | } | 74 | } |
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp index 7d21ea6..fe996cb 100644 --- a/common/sourcewriteback.cpp +++ b/common/sourcewriteback.cpp | |||
@@ -55,18 +55,26 @@ RemoteIdMap &SourceWriteBack::syncStore() | |||
55 | return *mSyncStore; | 55 | return *mSyncStore; |
56 | } | 56 | } |
57 | 57 | ||
58 | KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | 58 | bool SourceWriteBack::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) |
59 | { | 59 | { |
60 | SinkTrace() << "Replaying" << type << key; | ||
61 | |||
62 | Sink::EntityBuffer buffer(value); | 60 | Sink::EntityBuffer buffer(value); |
63 | const Sink::Entity &entity = buffer.entity(); | 61 | const Sink::Entity &entity = buffer.entity(); |
64 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | 62 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); |
65 | Q_ASSERT(metadataBuffer); | 63 | Q_ASSERT(metadataBuffer); |
66 | if (!metadataBuffer->replayToSource()) { | 64 | if (!metadataBuffer->replayToSource()) { |
67 | SinkTrace() << "Change is coming from the source"; | 65 | SinkTrace() << "Change is coming from the source"; |
68 | return KAsync::null<void>(); | ||
69 | } | 66 | } |
67 | return metadataBuffer->replayToSource(); | ||
68 | } | ||
69 | |||
70 | KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
71 | { | ||
72 | SinkTrace() << "Replaying" << type << key; | ||
73 | |||
74 | Sink::EntityBuffer buffer(value); | ||
75 | const Sink::Entity &entity = buffer.entity(); | ||
76 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
77 | Q_ASSERT(metadataBuffer); | ||
70 | Q_ASSERT(!mSyncStore); | 78 | Q_ASSERT(!mSyncStore); |
71 | Q_ASSERT(!mEntityStore); | 79 | Q_ASSERT(!mEntityStore); |
72 | Q_ASSERT(!mTransaction); | 80 | Q_ASSERT(!mTransaction); |
diff --git a/common/sourcewriteback.h b/common/sourcewriteback.h index 8531ff5..8031573 100644 --- a/common/sourcewriteback.h +++ b/common/sourcewriteback.h | |||
@@ -39,6 +39,7 @@ public: | |||
39 | protected: | 39 | protected: |
40 | ///Base implementation calls the replay$Type calls | 40 | ///Base implementation calls the replay$Type calls |
41 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | 41 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; |
42 | virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | ||
42 | 43 | ||
43 | protected: | 44 | protected: |
44 | ///Implement to write back changes to the server | 45 | ///Implement to write back changes to the server |