summaryrefslogtreecommitdiffstats
path: root/common/changereplay.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-02 13:31:33 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-02 13:31:33 +0200
commitc8587bca69546a3a5fd1b2f6c09aff89095a90f8 (patch)
tree5bda73c2615b6515d940fd53f5ce45bf9964fcad /common/changereplay.cpp
parenta5d693e5b71caeb21d0337e93d62e49584ae6890 (diff)
downloadsink-c8587bca69546a3a5fd1b2f6c09aff89095a90f8.tar.gz
sink-c8587bca69546a3a5fd1b2f6c09aff89095a90f8.zip
Non blocking change-replay
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r--common/changereplay.cpp70
1 files changed, 44 insertions, 26 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 63c41c8..aebfdb0 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -30,7 +30,7 @@ using namespace Sink;
30#define DEBUG_AREA "resource.changereplay" 30#define DEBUG_AREA "resource.changereplay"
31 31
32ChangeReplay::ChangeReplay(const QByteArray &resourceName) 32ChangeReplay::ChangeReplay(const QByteArray &resourceName)
33 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite) 33 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false)
34{ 34{
35 Trace() << "Created change replay: " << resourceName; 35 Trace() << "Created change replay: " << resourceName;
36} 36}
@@ -58,15 +58,16 @@ bool ChangeReplay::allChangesReplayed()
58 return (lastReplayedRevision >= topRevision); 58 return (lastReplayedRevision >= topRevision);
59} 59}
60 60
61void ChangeReplay::revisionChanged() 61KAsync::Job<void> ChangeReplay::replayNextRevision()
62{ 62{
63 mReplayInProgress = true;
63 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 64 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
64 Warning() << error.message; 65 Warning() << error.message;
65 }); 66 });
66 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 67 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
67 Warning() << error.message; 68 Warning() << error.message;
68 }); 69 });
69 qint64 lastReplayedRevision = 1; 70 qint64 lastReplayedRevision = 0;
70 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 71 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
71 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 72 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
72 lastReplayedRevision = value.toLongLong(); 73 lastReplayedRevision = value.toLongLong();
@@ -75,28 +76,45 @@ void ChangeReplay::revisionChanged()
75 [](const Storage::Error &) {}); 76 [](const Storage::Error &) {});
76 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); 77 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
77 78
78 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; 79 if (lastReplayedRevision < topRevision) {
79 if (lastReplayedRevision <= topRevision) { 80 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
80 qint64 revision = lastReplayedRevision; 81 qint64 revision = lastReplayedRevision + 1;
81 for (; revision <= topRevision; revision++) { 82 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
82 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 83 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
83 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); 84 const auto key = Storage::assembleKey(uid, revision);
84 const auto key = Storage::assembleKey(uid, revision); 85 KAsync::Job<void> replayJob = KAsync::null<void>();
85 Storage::mainDatabase(mainStoreTransaction, type) 86 Storage::mainDatabase(mainStoreTransaction, type)
86 .scan(key, 87 .scan(key,
87 [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { 88 [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool {
88 Trace() << "Replaying " << key; 89 Trace() << "Replaying " << key;
89 replay(type, key, value).exec(); 90 replayJob = replay(type, key, value);
90 // TODO make for loop async, and pass to async replay function together with type 91 // TODO make for loop async, and pass to async replay function together with type
91 return false; 92 return false;
92 }, 93 },
93 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); 94 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; });
94 } 95 return replayJob.then<void>([this, revision]() {
95 revision--; 96 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
96 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 97 Warning() << error.message;
97 replayStoreTransaction.commit(); 98 });
98 Trace() << "Replayed until " << revision; 99 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
100 replayStoreTransaction.commit();
101 Trace() << "Replayed until " << revision;
102 }).then<void>([this]() {
103 //replay until we're done
104 replayNextRevision().exec();
105 });
106 } else {
107 Trace() << "No changes to replay";
108 mReplayInProgress = false;
109 emit changesReplayed();
110 }
111 return KAsync::null<void>();
112}
113
114void ChangeReplay::revisionChanged()
115{
116 if (!mReplayInProgress) {
117 replayNextRevision().exec();
99 } 118 }
100 emit changesReplayed();
101} 119}
102 120