summaryrefslogtreecommitdiffstats
path: root/common/changereplay.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r--common/changereplay.cpp102
1 files changed, 102 insertions, 0 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
new file mode 100644
index 0000000..2447b6e
--- /dev/null
+++ b/common/changereplay.cpp
@@ -0,0 +1,102 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "changereplay.h"
21
22#include "entitybuffer.h"
23#include "log.h"
24#include "definitions.h"
25#include "bufferutils.h"
26
27using namespace Sink;
28
29#undef DEBUG_AREA
30#define DEBUG_AREA "resource.changereplay"
31
32ChangeReplay::ChangeReplay(const QByteArray &resourceName)
33 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite)
34{
35 Trace() << "Created change replay: " << resourceName;
36}
37
38qint64 ChangeReplay::getLastReplayedRevision()
39{
40 qint64 lastReplayedRevision = 0;
41 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly);
42 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
43 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
44 lastReplayedRevision = value.toLongLong();
45 return false;
46 },
47 [](const Storage::Error &) {});
48 return lastReplayedRevision;
49}
50
51bool ChangeReplay::allChangesReplayed()
52{
53 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
54 Warning() << error.message;
55 }));
56 const qint64 lastReplayedRevision = getLastReplayedRevision();
57 Trace() << "All changes replayed " << topRevision << lastReplayedRevision;
58 return (lastReplayedRevision >= topRevision);
59}
60
61void ChangeReplay::revisionChanged()
62{
63 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
64 Warning() << error.message;
65 });
66 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
67 Warning() << error.message;
68 });
69 qint64 lastReplayedRevision = 1;
70 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
71 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
72 lastReplayedRevision = value.toLongLong();
73 return false;
74 },
75 [](const Storage::Error &) {});
76 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
77
78 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
79 if (lastReplayedRevision <= topRevision) {
80 qint64 revision = lastReplayedRevision;
81 for (; revision <= topRevision; revision++) {
82 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
83 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
84 const auto key = Storage::assembleKey(uid, revision);
85 Storage::mainDatabase(mainStoreTransaction, type)
86 .scan(key,
87 [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool {
88 replay(type, key, value).exec();
89 // TODO make for loop async, and pass to async replay function together with type
90 Trace() << "Replaying " << key;
91 return false;
92 },
93 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; });
94 }
95 revision--;
96 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
97 replayStoreTransaction.commit();
98 Trace() << "Replayed until " << revision;
99 }
100 emit changesReplayed();
101}
102