From 33029ab13cdb378e2b4a0886a591fb02a5cb2b65 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 30 Jan 2018 11:27:33 +0100 Subject: Support for storage upgrades --- common/commands.cpp | 4 ++++ common/commands.h | 1 + common/definitions.cpp | 6 ++++++ common/definitions.h | 1 + common/genericresource.cpp | 27 +++++++++++++++++++++++++++ common/genericresource.h | 1 + common/listener.cpp | 11 +++++++++++ common/listener.h | 2 ++ common/resource.cpp | 5 +++++ common/resource.h | 1 + common/storage.h | 3 +++ common/storage_common.cpp | 24 +++++++++++++++++++++++- common/store.cpp | 24 ++++++++++++++++++++++-- synchronizer/main.cpp | 1 + 14 files changed, 108 insertions(+), 3 deletions(-) diff --git a/common/commands.cpp b/common/commands.cpp index 24f2017..a92b455 100644 --- a/common/commands.cpp +++ b/common/commands.cpp @@ -63,6 +63,10 @@ QByteArray name(int commandId) return "RemoveFromDisk"; case FlushCommand: return "Flush"; + case SecretCommand: + return "Secret"; + case UpgradeCommand: + return "Upgrade"; case CustomCommand: return "Custom"; }; diff --git a/common/commands.h b/common/commands.h index 8ced268..9ca92a3 100644 --- a/common/commands.h +++ b/common/commands.h @@ -49,6 +49,7 @@ enum CommandIds RemoveFromDiskCommand, FlushCommand, SecretCommand, + UpgradeCommand, CustomCommand = 0xffff }; diff --git a/common/definitions.cpp b/common/definitions.cpp index ee18d52..1f4c0cf 100644 --- a/common/definitions.cpp +++ b/common/definitions.cpp @@ -86,3 +86,9 @@ QString Sink::resourceStorageLocation(const QByteArray &resourceInstanceIdentifi { return storageLocation() + "/" + resourceInstanceIdentifier + "/data"; } + + +qint64 Sink::latestDatabaseVersion() +{ + return 0; +} diff --git a/common/definitions.h b/common/definitions.h index 7ef215b..ce424eb 100644 --- a/common/definitions.h +++ b/common/definitions.h @@ -30,6 +30,7 @@ QString SINK_EXPORT dataLocation(); QString SINK_EXPORT configLocation(); QString SINK_EXPORT temporaryFileLocation(); QString SINK_EXPORT resourceStorageLocation(const QByteArray &resourceInstanceIdentifier); +qint64 SINK_EXPORT latestDatabaseVersion(); /** * Clear the location cache and lookup locations again. diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 00d7d0c..8d27e04 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -56,6 +56,33 @@ void GenericResource::setSecret(const QString &s) } } +bool GenericResource::checkForUpgrade() +{ + const auto currentDatabaseVersion = [&] { + auto store = Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly); + return Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly)); + }(); + if (currentDatabaseVersion < Sink::latestDatabaseVersion()) { + SinkLog() << "Starting database upgrade from " << currentDatabaseVersion << " to " << Sink::latestDatabaseVersion(); + + //Right now upgrading just means removing all local storage so we will resync + Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); + + { + auto store = Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadWrite); + auto t = store.createTransaction(Storage::DataStore::ReadWrite); + Storage::DataStore::setDatabaseVersion(t, Sink::latestDatabaseVersion()); + } + SinkLog() << "Finished database upgrade to " << Sink::latestDatabaseVersion(); + return true; + } + return false; +} + void GenericResource::setupPreprocessors(const QByteArray &type, const QVector &preprocessors) { mPipeline->setPreprocessors(type, preprocessors); diff --git a/common/genericresource.h b/common/genericresource.h index edcd7d2..11ede0c 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -48,6 +48,7 @@ public: static qint64 diskUsage(const QByteArray &instanceIdentifier); virtual void setSecret(const QString &s) Q_DECL_OVERRIDE; + virtual bool checkForUpgrade() Q_DECL_OVERRIDE; //TODO Remove this API, it's only used in tests KAsync::Job synchronizeWithSource(const Sink::QueryBase &query); diff --git a/common/listener.cpp b/common/listener.cpp index c6747cd..9922c48 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -87,6 +87,14 @@ Listener::~Listener() closeAllConnections(); } +void Listener::checkForUpgrade() +{ + if (loadResource().checkForUpgrade()) { + //Close the resource to ensure no transactions are open + m_resource.reset(nullptr); + } +} + void Listener::emergencyAbortAllConnections() { Sink::Notification n; @@ -289,6 +297,9 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c } m_exiting = true; } break; + case Sink::Commands::UpgradeCommand: + //Because we synchronously run the update directly on resource start, we know that the upgrade is complete once this message completes. + break; default: if (commandId > Sink::Commands::CustomCommand) { SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId; diff --git a/common/listener.h b/common/listener.h index f29130d..38e87da 100644 --- a/common/listener.h +++ b/common/listener.h @@ -59,6 +59,8 @@ public: Listener(const QByteArray &resourceName, const QByteArray &resourceType, QObject *parent = 0); ~Listener(); + void checkForUpgrade(); + signals: void noClients(); diff --git a/common/resource.cpp b/common/resource.cpp index 78cc51b..96aaded 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -56,6 +56,11 @@ void Resource::setSecret(const QString &s) Q_UNUSED(s) } +bool Resource::checkForUpgrade() +{ + return false; +} + class ResourceFactory::Private { diff --git a/common/resource.h b/common/resource.h index 938f267..1b0b388 100644 --- a/common/resource.h +++ b/common/resource.h @@ -48,6 +48,7 @@ public: virtual void setLowerBoundRevision(qint64 revision); virtual void setSecret(const QString &s); + virtual bool checkForUpgrade(); signals: void revisionUpdated(qint64); diff --git a/common/storage.h b/common/storage.h index 1967a5e..f5f1879 100644 --- a/common/storage.h +++ b/common/storage.h @@ -235,6 +235,9 @@ public: static QByteArray generateUid(); + static qint64 databaseVersion(const Transaction &); + static void setDatabaseVersion(Transaction &, qint64 revision); + private: std::function mErrorHandler; diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 630dae9..830eff2 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp @@ -233,9 +233,31 @@ bool DataStore::NamedDatabase::contains(const QByteArray &uid) found = true; return false; }, - [this](const DataStore::Error &error) {}, true); + [](const DataStore::Error &error) {}, true); return found; } +void DataStore::setDatabaseVersion(DataStore::Transaction &transaction, qint64 revision) +{ + transaction.openDatabase().write("__internal_databaseVersion", QByteArray::number(revision)); +} + +qint64 DataStore::databaseVersion(const DataStore::Transaction &transaction) +{ + qint64 r = 0; + transaction.openDatabase().scan("__internal_databaseVersion", + [&](const QByteArray &, const QByteArray &revision) -> bool { + r = revision.toLongLong(); + return false; + }, + [](const Error &error) { + if (error.code != DataStore::NotFound) { + SinkWarning() << "Couldn't find the database version: " << error; + } + }); + return r; +} + + } } // namespace Sink diff --git a/common/store.cpp b/common/store.cpp index b16fa4e..2fa62d6 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -302,13 +302,33 @@ KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) }); } +static KAsync::Job upgrade(const QByteArray &resource) +{ + SinkLog() << "Upgrading " << resource; + auto store = Sink::Storage::DataStore(Sink::storageLocation(), resource, Sink::Storage::DataStore::ReadOnly); + if (Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly)) >= Sink::latestDatabaseVersion()) { + return KAsync::null(); + } + + auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); + return resourceAccess->sendCommand(Sink::Commands::UpgradeCommand) + .addToContext(resourceAccess) + .then([=](const KAsync::Error &error) { + if (error) { + SinkWarning() << "Error during upgrade."; + return KAsync::error(error); + } + SinkTrace() << "Upgrade of resource " << resource << " complete."; + return KAsync::null(); + }); +} + KAsync::Job Store::upgrade() { SinkLog() << "Upgrading..."; return fetchAll({}) .template each([](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job { - SinkLog() << "Removing caches for " << resource->identifier(); - return removeDataFromDisk(resource->identifier()); + return Sink::upgrade(resource->identifier()); }) .then([] { SinkLog() << "Upgrade complete."; diff --git a/synchronizer/main.cpp b/synchronizer/main.cpp index 3c41c67..8e99d54 100644 --- a/synchronizer/main.cpp +++ b/synchronizer/main.cpp @@ -216,6 +216,7 @@ int main(int argc, char *argv[]) } listener = new Listener(instanceIdentifier, resourceType, &app); + listener->checkForUpgrade(); QObject::connect(&app, &QCoreApplication::aboutToQuit, listener, &Listener::closeAllConnections); QObject::connect(listener, &Listener::noClients, &app, &QCoreApplication::quit); -- cgit v1.2.3