diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-06-02 13:31:33 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-06-02 13:31:33 +0200 |
commit | c8587bca69546a3a5fd1b2f6c09aff89095a90f8 (patch) | |
tree | 5bda73c2615b6515d940fd53f5ce45bf9964fcad /common/changereplay.cpp | |
parent | a5d693e5b71caeb21d0337e93d62e49584ae6890 (diff) | |
download | sink-c8587bca69546a3a5fd1b2f6c09aff89095a90f8.tar.gz sink-c8587bca69546a3a5fd1b2f6c09aff89095a90f8.zip |
Non blocking change-replay
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r-- | common/changereplay.cpp | 70 |
1 files changed, 44 insertions, 26 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 63c41c8..aebfdb0 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -30,7 +30,7 @@ using namespace Sink; | |||
30 | #define DEBUG_AREA "resource.changereplay" | 30 | #define DEBUG_AREA "resource.changereplay" |
31 | 31 | ||
32 | ChangeReplay::ChangeReplay(const QByteArray &resourceName) | 32 | ChangeReplay::ChangeReplay(const QByteArray &resourceName) |
33 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite) | 33 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) |
34 | { | 34 | { |
35 | Trace() << "Created change replay: " << resourceName; | 35 | Trace() << "Created change replay: " << resourceName; |
36 | } | 36 | } |
@@ -58,15 +58,16 @@ bool ChangeReplay::allChangesReplayed() | |||
58 | return (lastReplayedRevision >= topRevision); | 58 | return (lastReplayedRevision >= topRevision); |
59 | } | 59 | } |
60 | 60 | ||
61 | void ChangeReplay::revisionChanged() | 61 | KAsync::Job<void> ChangeReplay::replayNextRevision() |
62 | { | 62 | { |
63 | mReplayInProgress = true; | ||
63 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 64 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
64 | Warning() << error.message; | 65 | Warning() << error.message; |
65 | }); | 66 | }); |
66 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 67 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
67 | Warning() << error.message; | 68 | Warning() << error.message; |
68 | }); | 69 | }); |
69 | qint64 lastReplayedRevision = 1; | 70 | qint64 lastReplayedRevision = 0; |
70 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | 71 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
71 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | 72 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { |
72 | lastReplayedRevision = value.toLongLong(); | 73 | lastReplayedRevision = value.toLongLong(); |
@@ -75,28 +76,45 @@ void ChangeReplay::revisionChanged() | |||
75 | [](const Storage::Error &) {}); | 76 | [](const Storage::Error &) {}); |
76 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | 77 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); |
77 | 78 | ||
78 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | 79 | if (lastReplayedRevision < topRevision) { |
79 | if (lastReplayedRevision <= topRevision) { | 80 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; |
80 | qint64 revision = lastReplayedRevision; | 81 | qint64 revision = lastReplayedRevision + 1; |
81 | for (; revision <= topRevision; revision++) { | 82 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); |
82 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | 83 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); |
83 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | 84 | const auto key = Storage::assembleKey(uid, revision); |
84 | const auto key = Storage::assembleKey(uid, revision); | 85 | KAsync::Job<void> replayJob = KAsync::null<void>(); |
85 | Storage::mainDatabase(mainStoreTransaction, type) | 86 | Storage::mainDatabase(mainStoreTransaction, type) |
86 | .scan(key, | 87 | .scan(key, |
87 | [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | 88 | [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool { |
88 | Trace() << "Replaying " << key; | 89 | Trace() << "Replaying " << key; |
89 | replay(type, key, value).exec(); | 90 | replayJob = replay(type, key, value); |
90 | // TODO make for loop async, and pass to async replay function together with type | 91 | // TODO make for loop async, and pass to async replay function together with type |
91 | return false; | 92 | return false; |
92 | }, | 93 | }, |
93 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | 94 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); |
94 | } | 95 | return replayJob.then<void>([this, revision]() { |
95 | revision--; | 96 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
96 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | 97 | Warning() << error.message; |
97 | replayStoreTransaction.commit(); | 98 | }); |
98 | Trace() << "Replayed until " << revision; | 99 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); |
100 | replayStoreTransaction.commit(); | ||
101 | Trace() << "Replayed until " << revision; | ||
102 | }).then<void>([this]() { | ||
103 | //replay until we're done | ||
104 | replayNextRevision().exec(); | ||
105 | }); | ||
106 | } else { | ||
107 | Trace() << "No changes to replay"; | ||
108 | mReplayInProgress = false; | ||
109 | emit changesReplayed(); | ||
110 | } | ||
111 | return KAsync::null<void>(); | ||
112 | } | ||
113 | |||
114 | void ChangeReplay::revisionChanged() | ||
115 | { | ||
116 | if (!mReplayInProgress) { | ||
117 | replayNextRevision().exec(); | ||
99 | } | 118 | } |
100 | emit changesReplayed(); | ||
101 | } | 119 | } |
102 | 120 | ||