summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/changereplay.cpp144
-rw-r--r--common/changereplay.h4
-rw-r--r--common/sourcewriteback.cpp16
-rw-r--r--common/sourcewriteback.h1
4 files changed, 107 insertions, 58 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 638a30d..fbd556f 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -59,67 +59,103 @@ bool ChangeReplay::allChangesReplayed()
59 return (lastReplayedRevision >= topRevision); 59 return (lastReplayedRevision >= topRevision);
60} 60}
61 61
62KAsync::Job<void> ChangeReplay::replayNextRevision() 62void ChangeReplay::recordReplayedRevision(qint64 revision)
63{ 63{
64 mReplayInProgress = true; 64 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
65 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
66 SinkWarning() << error.message;
67 });
68 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
69 SinkWarning() << error.message; 65 SinkWarning() << error.message;
70 }); 66 });
71 qint64 lastReplayedRevision = 0; 67 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
72 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 68 replayStoreTransaction.commit();
73 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 69};
74 lastReplayedRevision = value.toLongLong();
75 return false;
76 },
77 [](const Storage::Error &) {});
78 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
79 70
80 auto recordReplayedRevision = [this](qint64 revision) { 71KAsync::Job<void> ChangeReplay::replayNextRevision()
81 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 72{
82 SinkWarning() << error.message; 73 auto lastReplayedRevision = QSharedPointer<qint64>::create(0);
83 }); 74 auto topRevision = QSharedPointer<qint64>::create(0);
84 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 75 return KAsync::start<void>([this, lastReplayedRevision, topRevision]() {
85 replayStoreTransaction.commit(); 76 mReplayInProgress = true;
86 }; 77 mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
87 78 SinkWarning() << error.message;
88 if (lastReplayedRevision < topRevision) { 79 });
89 SinkTrace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; 80 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
90 qint64 revision = lastReplayedRevision + 1; 81 SinkWarning() << error.message;
91 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 82 });
92 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); 83 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
93 const auto key = Storage::assembleKey(uid, revision); 84 [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
94 KAsync::Job<void> replayJob = KAsync::null<void>(); 85 *lastReplayedRevision = value.toLongLong();
95 Storage::mainDatabase(mainStoreTransaction, type)
96 .scan(key,
97 [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool {
98 SinkTrace() << "Replaying " << key;
99 replayJob = replay(type, key, value);
100 return false; 86 return false;
101 }, 87 },
102 [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); 88 [](const Storage::Error &) {});
103 return replayJob.then<void>([this, revision, recordReplayedRevision]() { 89 *topRevision = Storage::maxRevision(mMainStoreTransaction);
104 SinkTrace() << "Replayed until " << revision; 90 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision;
105 recordReplayedRevision(revision); 91 })
106 //replay until we're done 92 .then(KAsync::dowhile(
107 QTimer::singleShot(0, this, [this]() { 93 [this, lastReplayedRevision, topRevision](KAsync::Future<bool> &future) {
108 replayNextRevision().exec(); 94 if (*lastReplayedRevision >= *topRevision) {
109 }); 95 future.setValue(false);
110 }, 96 future.setFinished();
111 [this, revision, recordReplayedRevision](int, QString) { 97 return;
112 SinkTrace() << "Change replay failed" << revision; 98 }
113 //We're probably not online or so, so postpone retrying 99
114 mReplayInProgress = false; 100 qint64 revision = *lastReplayedRevision + 1;
115 emit changesReplayed(); 101 KAsync::Job<void> replayJob = KAsync::null<void>();
102 while (revision <= *topRevision) {
103 const auto uid = Storage::getUidFromRevision(mMainStoreTransaction, revision);
104 const auto type = Storage::getTypeFromRevision(mMainStoreTransaction, revision);
105 const auto key = Storage::assembleKey(uid, revision);
106 bool exitLoop = false;
107 Storage::mainDatabase(mMainStoreTransaction, type)
108 .scan(key,
109 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool {
110 SinkTrace() << "Replaying " << key;
111 if (canReplay(type, key, value)) {
112 replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision]() {
113 recordReplayedRevision(revision);
114 *lastReplayedRevision = revision;
115 },
116 [revision](int, QString) {
117 SinkTrace() << "Change replay failed" << revision;
118 });
119 exitLoop = true;
120 } else {
121 *lastReplayedRevision = revision;
122 }
123 return false;
124 },
125 [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; });
126 if (exitLoop) {
127 break;
128 }
129 revision++;
130 }
131 replayJob.then<void>([this, revision, lastReplayedRevision, topRevision, &future]() {
132 SinkTrace() << "Replayed until " << revision;
133 recordReplayedRevision(*lastReplayedRevision);
134 QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() {
135 future.setValue((*lastReplayedRevision < *topRevision));
136 future.setFinished();
137 });
138 },
139 [this, revision, &future](int, QString) {
140 SinkTrace() << "Change replay failed" << revision;
141 //We're probably not online or so, so postpone retrying
142 future.setValue(false);
143 future.setFinished();
144 }).exec();
145
146 }))
147 .then<void>([this, lastReplayedRevision]() {
148 recordReplayedRevision(*lastReplayedRevision);
149 mMainStoreTransaction.abort();
150 if (allChangesReplayed()) {
151 mReplayInProgress = false;
152 emit changesReplayed();
153 } else {
154 QTimer::singleShot(0, [this]() {
155 replayNextRevision().exec();
156 });
157 }
116 }); 158 });
117 } else {
118 SinkTrace() << "No changes to replay";
119 mReplayInProgress = false;
120 emit changesReplayed();
121 }
122 return KAsync::null<void>();
123} 159}
124 160
125void ChangeReplay::revisionChanged() 161void ChangeReplay::revisionChanged()
diff --git a/common/changereplay.h b/common/changereplay.h
index 6c1c1db..88d6ce3 100644
--- a/common/changereplay.h
+++ b/common/changereplay.h
@@ -52,12 +52,15 @@ public slots:
52 52
53protected: 53protected:
54 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; 54 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0;
55 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0;
55 Sink::Storage mStorage; 56 Sink::Storage mStorage;
56 57
57private: 58private:
59 void recordReplayedRevision(qint64 revision);
58 KAsync::Job<void> replayNextRevision(); 60 KAsync::Job<void> replayNextRevision();
59 Sink::Storage mChangeReplayStore; 61 Sink::Storage mChangeReplayStore;
60 bool mReplayInProgress; 62 bool mReplayInProgress;
63 Sink::Storage::Transaction mMainStoreTransaction;
61}; 64};
62 65
63class NullChangeReplay : public ChangeReplay 66class NullChangeReplay : public ChangeReplay
@@ -65,6 +68,7 @@ class NullChangeReplay : public ChangeReplay
65public: 68public:
66 NullChangeReplay(const QByteArray &resourceName) : ChangeReplay(resourceName) {} 69 NullChangeReplay(const QByteArray &resourceName) : ChangeReplay(resourceName) {}
67 KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null<void>(); } 70 KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null<void>(); }
71 bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return false; }
68}; 72};
69 73
70} 74}
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp
index 7d21ea6..fe996cb 100644
--- a/common/sourcewriteback.cpp
+++ b/common/sourcewriteback.cpp
@@ -55,18 +55,26 @@ RemoteIdMap &SourceWriteBack::syncStore()
55 return *mSyncStore; 55 return *mSyncStore;
56} 56}
57 57
58KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) 58bool SourceWriteBack::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
59{ 59{
60 SinkTrace() << "Replaying" << type << key;
61
62 Sink::EntityBuffer buffer(value); 60 Sink::EntityBuffer buffer(value);
63 const Sink::Entity &entity = buffer.entity(); 61 const Sink::Entity &entity = buffer.entity();
64 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 62 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
65 Q_ASSERT(metadataBuffer); 63 Q_ASSERT(metadataBuffer);
66 if (!metadataBuffer->replayToSource()) { 64 if (!metadataBuffer->replayToSource()) {
67 SinkTrace() << "Change is coming from the source"; 65 SinkTrace() << "Change is coming from the source";
68 return KAsync::null<void>();
69 } 66 }
67 return metadataBuffer->replayToSource();
68}
69
70KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
71{
72 SinkTrace() << "Replaying" << type << key;
73
74 Sink::EntityBuffer buffer(value);
75 const Sink::Entity &entity = buffer.entity();
76 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
77 Q_ASSERT(metadataBuffer);
70 Q_ASSERT(!mSyncStore); 78 Q_ASSERT(!mSyncStore);
71 Q_ASSERT(!mEntityStore); 79 Q_ASSERT(!mEntityStore);
72 Q_ASSERT(!mTransaction); 80 Q_ASSERT(!mTransaction);
diff --git a/common/sourcewriteback.h b/common/sourcewriteback.h
index 8531ff5..8031573 100644
--- a/common/sourcewriteback.h
+++ b/common/sourcewriteback.h
@@ -39,6 +39,7 @@ public:
39protected: 39protected:
40 ///Base implementation calls the replay$Type calls 40 ///Base implementation calls the replay$Type calls
41 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; 41 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
42 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
42 43
43protected: 44protected:
44 ///Implement to write back changes to the server 45 ///Implement to write back changes to the server