From dc6bc885b70d8dbada622c22f8d620084b798648 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 20 Jul 2016 09:31:25 +0200 Subject: Don't create a transaction for every revision that we don't replay. This had a significant performance impact when i.e. syncing a folder with 10k messages. --- common/changereplay.cpp | 144 ++++++++++++++++++++++++++++----------------- common/changereplay.h | 4 ++ common/sourcewriteback.cpp | 16 +++-- common/sourcewriteback.h | 1 + 4 files changed, 107 insertions(+), 58 deletions(-) diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 638a30d..fbd556f 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp @@ -59,67 +59,103 @@ bool ChangeReplay::allChangesReplayed() return (lastReplayedRevision >= topRevision); } -KAsync::Job ChangeReplay::replayNextRevision() +void ChangeReplay::recordReplayedRevision(qint64 revision) { - mReplayInProgress = true; - auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { - SinkWarning() << error.message; - }); - auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { + auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { SinkWarning() << error.message; }); - qint64 lastReplayedRevision = 0; - replayStoreTransaction.openDatabase().scan("lastReplayedRevision", - [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { - lastReplayedRevision = value.toLongLong(); - return false; - }, - [](const Storage::Error &) {}); - const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); + replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); + replayStoreTransaction.commit(); +}; - auto recordReplayedRevision = [this](qint64 revision) { - auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { - SinkWarning() << error.message; - }); - replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); - replayStoreTransaction.commit(); - }; - - if (lastReplayedRevision < topRevision) { - SinkTrace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; - qint64 revision = lastReplayedRevision + 1; - const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); - const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); - const auto key = Storage::assembleKey(uid, revision); - KAsync::Job replayJob = KAsync::null(); - Storage::mainDatabase(mainStoreTransaction, type) - .scan(key, - [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool { - SinkTrace() << "Replaying " << key; - replayJob = replay(type, key, value); +KAsync::Job ChangeReplay::replayNextRevision() +{ + auto lastReplayedRevision = QSharedPointer::create(0); + auto topRevision = QSharedPointer::create(0); + return KAsync::start([this, lastReplayedRevision, topRevision]() { + mReplayInProgress = true; + mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { + SinkWarning() << error.message; + }); + auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { + SinkWarning() << error.message; + }); + replayStoreTransaction.openDatabase().scan("lastReplayedRevision", + [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { + *lastReplayedRevision = value.toLongLong(); return false; }, - [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); - return replayJob.then([this, revision, recordReplayedRevision]() { - SinkTrace() << "Replayed until " << revision; - recordReplayedRevision(revision); - //replay until we're done - QTimer::singleShot(0, this, [this]() { - replayNextRevision().exec(); - }); - }, - [this, revision, recordReplayedRevision](int, QString) { - SinkTrace() << "Change replay failed" << revision; - //We're probably not online or so, so postpone retrying - mReplayInProgress = false; - emit changesReplayed(); + [](const Storage::Error &) {}); + *topRevision = Storage::maxRevision(mMainStoreTransaction); + SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; + }) + .then(KAsync::dowhile( + [this, lastReplayedRevision, topRevision](KAsync::Future &future) { + if (*lastReplayedRevision >= *topRevision) { + future.setValue(false); + future.setFinished(); + return; + } + + qint64 revision = *lastReplayedRevision + 1; + KAsync::Job replayJob = KAsync::null(); + while (revision <= *topRevision) { + const auto uid = Storage::getUidFromRevision(mMainStoreTransaction, revision); + const auto type = Storage::getTypeFromRevision(mMainStoreTransaction, revision); + const auto key = Storage::assembleKey(uid, revision); + bool exitLoop = false; + Storage::mainDatabase(mMainStoreTransaction, type) + .scan(key, + [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { + SinkTrace() << "Replaying " << key; + if (canReplay(type, key, value)) { + replayJob = replay(type, key, value).then([this, revision, lastReplayedRevision]() { + recordReplayedRevision(revision); + *lastReplayedRevision = revision; + }, + [revision](int, QString) { + SinkTrace() << "Change replay failed" << revision; + }); + exitLoop = true; + } else { + *lastReplayedRevision = revision; + } + return false; + }, + [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); + if (exitLoop) { + break; + } + revision++; + } + replayJob.then([this, revision, lastReplayedRevision, topRevision, &future]() { + SinkTrace() << "Replayed until " << revision; + recordReplayedRevision(*lastReplayedRevision); + QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() { + future.setValue((*lastReplayedRevision < *topRevision)); + future.setFinished(); + }); + }, + [this, revision, &future](int, QString) { + SinkTrace() << "Change replay failed" << revision; + //We're probably not online or so, so postpone retrying + future.setValue(false); + future.setFinished(); + }).exec(); + + })) + .then([this, lastReplayedRevision]() { + recordReplayedRevision(*lastReplayedRevision); + mMainStoreTransaction.abort(); + if (allChangesReplayed()) { + mReplayInProgress = false; + emit changesReplayed(); + } else { + QTimer::singleShot(0, [this]() { + replayNextRevision().exec(); + }); + } }); - } else { - SinkTrace() << "No changes to replay"; - mReplayInProgress = false; - emit changesReplayed(); - } - return KAsync::null(); } void ChangeReplay::revisionChanged() diff --git a/common/changereplay.h b/common/changereplay.h index 6c1c1db..88d6ce3 100644 --- a/common/changereplay.h +++ b/common/changereplay.h @@ -52,12 +52,15 @@ public slots: protected: virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; + virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; Sink::Storage mStorage; private: + void recordReplayedRevision(qint64 revision); KAsync::Job replayNextRevision(); Sink::Storage mChangeReplayStore; bool mReplayInProgress; + Sink::Storage::Transaction mMainStoreTransaction; }; class NullChangeReplay : public ChangeReplay @@ -65,6 +68,7 @@ class NullChangeReplay : public ChangeReplay public: NullChangeReplay(const QByteArray &resourceName) : ChangeReplay(resourceName) {} KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null(); } + bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return false; } }; } diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp index 7d21ea6..fe996cb 100644 --- a/common/sourcewriteback.cpp +++ b/common/sourcewriteback.cpp @@ -55,18 +55,26 @@ RemoteIdMap &SourceWriteBack::syncStore() return *mSyncStore; } -KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) +bool SourceWriteBack::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) { - SinkTrace() << "Replaying" << type << key; - Sink::EntityBuffer buffer(value); const Sink::Entity &entity = buffer.entity(); const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); Q_ASSERT(metadataBuffer); if (!metadataBuffer->replayToSource()) { SinkTrace() << "Change is coming from the source"; - return KAsync::null(); } + return metadataBuffer->replayToSource(); +} + +KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) +{ + SinkTrace() << "Replaying" << type << key; + + Sink::EntityBuffer buffer(value); + const Sink::Entity &entity = buffer.entity(); + const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); Q_ASSERT(!mSyncStore); Q_ASSERT(!mEntityStore); Q_ASSERT(!mTransaction); diff --git a/common/sourcewriteback.h b/common/sourcewriteback.h index 8531ff5..8031573 100644 --- a/common/sourcewriteback.h +++ b/common/sourcewriteback.h @@ -39,6 +39,7 @@ public: protected: ///Base implementation calls the replay$Type calls virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; + virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; protected: ///Implement to write back changes to the server -- cgit v1.2.3