summaryrefslogtreecommitdiffstats
path: root/common/changereplay.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-20 09:31:25 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-15 16:14:19 +0200
commitdc6bc885b70d8dbada622c22f8d620084b798648 (patch)
treec12f3fdcbea12f172b48a8ba7485cacfd0fc86bf /common/changereplay.cpp
parenta1b86c269f3d2a64d63f001df33bb3e1932423a0 (diff)
downloadsink-dc6bc885b70d8dbada622c22f8d620084b798648.tar.gz
sink-dc6bc885b70d8dbada622c22f8d620084b798648.zip
Don't create a transaction for every revision that we don't replay.
This had a significant performance impact when i.e. syncing a folder with 10k messages.
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r--common/changereplay.cpp144
1 files changed, 90 insertions, 54 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 638a30d..fbd556f 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -59,67 +59,103 @@ bool ChangeReplay::allChangesReplayed()
59 return (lastReplayedRevision >= topRevision); 59 return (lastReplayedRevision >= topRevision);
60} 60}
61 61
62KAsync::Job<void> ChangeReplay::replayNextRevision() 62void ChangeReplay::recordReplayedRevision(qint64 revision)
63{ 63{
64 mReplayInProgress = true; 64 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
65 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
66 SinkWarning() << error.message;
67 });
68 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
69 SinkWarning() << error.message; 65 SinkWarning() << error.message;
70 }); 66 });
71 qint64 lastReplayedRevision = 0; 67 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
72 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 68 replayStoreTransaction.commit();
73 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 69};
74 lastReplayedRevision = value.toLongLong();
75 return false;
76 },
77 [](const Storage::Error &) {});
78 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
79 70
80 auto recordReplayedRevision = [this](qint64 revision) { 71KAsync::Job<void> ChangeReplay::replayNextRevision()
81 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 72{
82 SinkWarning() << error.message; 73 auto lastReplayedRevision = QSharedPointer<qint64>::create(0);
83 }); 74 auto topRevision = QSharedPointer<qint64>::create(0);
84 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 75 return KAsync::start<void>([this, lastReplayedRevision, topRevision]() {
85 replayStoreTransaction.commit(); 76 mReplayInProgress = true;
86 }; 77 mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
87 78 SinkWarning() << error.message;
88 if (lastReplayedRevision < topRevision) { 79 });
89 SinkTrace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; 80 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
90 qint64 revision = lastReplayedRevision + 1; 81 SinkWarning() << error.message;
91 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 82 });
92 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); 83 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
93 const auto key = Storage::assembleKey(uid, revision); 84 [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
94 KAsync::Job<void> replayJob = KAsync::null<void>(); 85 *lastReplayedRevision = value.toLongLong();
95 Storage::mainDatabase(mainStoreTransaction, type)
96 .scan(key,
97 [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool {
98 SinkTrace() << "Replaying " << key;
99 replayJob = replay(type, key, value);
100 return false; 86 return false;
101 }, 87 },
102 [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); 88 [](const Storage::Error &) {});
103 return replayJob.then<void>([this, revision, recordReplayedRevision]() { 89 *topRevision = Storage::maxRevision(mMainStoreTransaction);
104 SinkTrace() << "Replayed until " << revision; 90 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision;
105 recordReplayedRevision(revision); 91 })
106 //replay until we're done 92 .then(KAsync::dowhile(
107 QTimer::singleShot(0, this, [this]() { 93 [this, lastReplayedRevision, topRevision](KAsync::Future<bool> &future) {
108 replayNextRevision().exec(); 94 if (*lastReplayedRevision >= *topRevision) {
109 }); 95 future.setValue(false);
110 }, 96 future.setFinished();
111 [this, revision, recordReplayedRevision](int, QString) { 97 return;
112 SinkTrace() << "Change replay failed" << revision; 98 }
113 //We're probably not online or so, so postpone retrying 99
114 mReplayInProgress = false; 100 qint64 revision = *lastReplayedRevision + 1;
115 emit changesReplayed(); 101 KAsync::Job<void> replayJob = KAsync::null<void>();
102 while (revision <= *topRevision) {
103 const auto uid = Storage::getUidFromRevision(mMainStoreTransaction, revision);
104 const auto type = Storage::getTypeFromRevision(mMainStoreTransaction, revision);
105 const auto key = Storage::assembleKey(uid, revision);
106 bool exitLoop = false;
107 Storage::mainDatabase(mMainStoreTransaction, type)
108 .scan(key,
109 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool {
110 SinkTrace() << "Replaying " << key;
111 if (canReplay(type, key, value)) {
112 replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision]() {
113 recordReplayedRevision(revision);
114 *lastReplayedRevision = revision;
115 },
116 [revision](int, QString) {
117 SinkTrace() << "Change replay failed" << revision;
118 });
119 exitLoop = true;
120 } else {
121 *lastReplayedRevision = revision;
122 }
123 return false;
124 },
125 [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; });
126 if (exitLoop) {
127 break;
128 }
129 revision++;
130 }
131 replayJob.then<void>([this, revision, lastReplayedRevision, topRevision, &future]() {
132 SinkTrace() << "Replayed until " << revision;
133 recordReplayedRevision(*lastReplayedRevision);
134 QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() {
135 future.setValue((*lastReplayedRevision < *topRevision));
136 future.setFinished();
137 });
138 },
139 [this, revision, &future](int, QString) {
140 SinkTrace() << "Change replay failed" << revision;
141 //We're probably not online or so, so postpone retrying
142 future.setValue(false);
143 future.setFinished();
144 }).exec();
145
146 }))
147 .then<void>([this, lastReplayedRevision]() {
148 recordReplayedRevision(*lastReplayedRevision);
149 mMainStoreTransaction.abort();
150 if (allChangesReplayed()) {
151 mReplayInProgress = false;
152 emit changesReplayed();
153 } else {
154 QTimer::singleShot(0, [this]() {
155 replayNextRevision().exec();
156 });
157 }
116 }); 158 });
117 } else {
118 SinkTrace() << "No changes to replay";
119 mReplayInProgress = false;
120 emit changesReplayed();
121 }
122 return KAsync::null<void>();
123} 159}
124 160
125void ChangeReplay::revisionChanged() 161void ChangeReplay::revisionChanged()