diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-01-30 11:27:33 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-01-30 15:28:55 +0100 |
commit | 33029ab13cdb378e2b4a0886a591fb02a5cb2b65 (patch) | |
tree | 1ffffda4f125fd4989856a9610d5843f5d097135 /common | |
parent | f675a280ad48a9a2ba7b38f81cf0dfdafb3a96b5 (diff) | |
download | sink-33029ab13cdb378e2b4a0886a591fb02a5cb2b65.tar.gz sink-33029ab13cdb378e2b4a0886a591fb02a5cb2b65.zip |
Support for storage upgrades
Diffstat (limited to 'common')
-rw-r--r-- | common/commands.cpp | 4 | ||||
-rw-r--r-- | common/commands.h | 1 | ||||
-rw-r--r-- | common/definitions.cpp | 6 | ||||
-rw-r--r-- | common/definitions.h | 1 | ||||
-rw-r--r-- | common/genericresource.cpp | 27 | ||||
-rw-r--r-- | common/genericresource.h | 1 | ||||
-rw-r--r-- | common/listener.cpp | 11 | ||||
-rw-r--r-- | common/listener.h | 2 | ||||
-rw-r--r-- | common/resource.cpp | 5 | ||||
-rw-r--r-- | common/resource.h | 1 | ||||
-rw-r--r-- | common/storage.h | 3 | ||||
-rw-r--r-- | common/storage_common.cpp | 24 | ||||
-rw-r--r-- | common/store.cpp | 24 |
13 files changed, 107 insertions, 3 deletions
diff --git a/common/commands.cpp b/common/commands.cpp index 24f2017..a92b455 100644 --- a/common/commands.cpp +++ b/common/commands.cpp | |||
@@ -63,6 +63,10 @@ QByteArray name(int commandId) | |||
63 | return "RemoveFromDisk"; | 63 | return "RemoveFromDisk"; |
64 | case FlushCommand: | 64 | case FlushCommand: |
65 | return "Flush"; | 65 | return "Flush"; |
66 | case SecretCommand: | ||
67 | return "Secret"; | ||
68 | case UpgradeCommand: | ||
69 | return "Upgrade"; | ||
66 | case CustomCommand: | 70 | case CustomCommand: |
67 | return "Custom"; | 71 | return "Custom"; |
68 | }; | 72 | }; |
diff --git a/common/commands.h b/common/commands.h index 8ced268..9ca92a3 100644 --- a/common/commands.h +++ b/common/commands.h | |||
@@ -49,6 +49,7 @@ enum CommandIds | |||
49 | RemoveFromDiskCommand, | 49 | RemoveFromDiskCommand, |
50 | FlushCommand, | 50 | FlushCommand, |
51 | SecretCommand, | 51 | SecretCommand, |
52 | UpgradeCommand, | ||
52 | CustomCommand = 0xffff | 53 | CustomCommand = 0xffff |
53 | }; | 54 | }; |
54 | 55 | ||
diff --git a/common/definitions.cpp b/common/definitions.cpp index ee18d52..1f4c0cf 100644 --- a/common/definitions.cpp +++ b/common/definitions.cpp | |||
@@ -86,3 +86,9 @@ QString Sink::resourceStorageLocation(const QByteArray &resourceInstanceIdentifi | |||
86 | { | 86 | { |
87 | return storageLocation() + "/" + resourceInstanceIdentifier + "/data"; | 87 | return storageLocation() + "/" + resourceInstanceIdentifier + "/data"; |
88 | } | 88 | } |
89 | |||
90 | |||
91 | qint64 Sink::latestDatabaseVersion() | ||
92 | { | ||
93 | return 0; | ||
94 | } | ||
diff --git a/common/definitions.h b/common/definitions.h index 7ef215b..ce424eb 100644 --- a/common/definitions.h +++ b/common/definitions.h | |||
@@ -30,6 +30,7 @@ QString SINK_EXPORT dataLocation(); | |||
30 | QString SINK_EXPORT configLocation(); | 30 | QString SINK_EXPORT configLocation(); |
31 | QString SINK_EXPORT temporaryFileLocation(); | 31 | QString SINK_EXPORT temporaryFileLocation(); |
32 | QString SINK_EXPORT resourceStorageLocation(const QByteArray &resourceInstanceIdentifier); | 32 | QString SINK_EXPORT resourceStorageLocation(const QByteArray &resourceInstanceIdentifier); |
33 | qint64 SINK_EXPORT latestDatabaseVersion(); | ||
33 | 34 | ||
34 | /** | 35 | /** |
35 | * Clear the location cache and lookup locations again. | 36 | * Clear the location cache and lookup locations again. |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 00d7d0c..8d27e04 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -56,6 +56,33 @@ void GenericResource::setSecret(const QString &s) | |||
56 | } | 56 | } |
57 | } | 57 | } |
58 | 58 | ||
59 | bool GenericResource::checkForUpgrade() | ||
60 | { | ||
61 | const auto currentDatabaseVersion = [&] { | ||
62 | auto store = Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly); | ||
63 | return Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly)); | ||
64 | }(); | ||
65 | if (currentDatabaseVersion < Sink::latestDatabaseVersion()) { | ||
66 | SinkLog() << "Starting database upgrade from " << currentDatabaseVersion << " to " << Sink::latestDatabaseVersion(); | ||
67 | |||
68 | //Right now upgrading just means removing all local storage so we will resync | ||
69 | Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadWrite).removeFromDisk(); | ||
70 | Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); | ||
71 | Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); | ||
72 | Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); | ||
73 | Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); | ||
74 | |||
75 | { | ||
76 | auto store = Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadWrite); | ||
77 | auto t = store.createTransaction(Storage::DataStore::ReadWrite); | ||
78 | Storage::DataStore::setDatabaseVersion(t, Sink::latestDatabaseVersion()); | ||
79 | } | ||
80 | SinkLog() << "Finished database upgrade to " << Sink::latestDatabaseVersion(); | ||
81 | return true; | ||
82 | } | ||
83 | return false; | ||
84 | } | ||
85 | |||
59 | void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) | 86 | void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) |
60 | { | 87 | { |
61 | mPipeline->setPreprocessors(type, preprocessors); | 88 | mPipeline->setPreprocessors(type, preprocessors); |
diff --git a/common/genericresource.h b/common/genericresource.h index edcd7d2..11ede0c 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -48,6 +48,7 @@ public: | |||
48 | static qint64 diskUsage(const QByteArray &instanceIdentifier); | 48 | static qint64 diskUsage(const QByteArray &instanceIdentifier); |
49 | 49 | ||
50 | virtual void setSecret(const QString &s) Q_DECL_OVERRIDE; | 50 | virtual void setSecret(const QString &s) Q_DECL_OVERRIDE; |
51 | virtual bool checkForUpgrade() Q_DECL_OVERRIDE; | ||
51 | 52 | ||
52 | //TODO Remove this API, it's only used in tests | 53 | //TODO Remove this API, it's only used in tests |
53 | KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query); | 54 | KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query); |
diff --git a/common/listener.cpp b/common/listener.cpp index c6747cd..9922c48 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -87,6 +87,14 @@ Listener::~Listener() | |||
87 | closeAllConnections(); | 87 | closeAllConnections(); |
88 | } | 88 | } |
89 | 89 | ||
90 | void Listener::checkForUpgrade() | ||
91 | { | ||
92 | if (loadResource().checkForUpgrade()) { | ||
93 | //Close the resource to ensure no transactions are open | ||
94 | m_resource.reset(nullptr); | ||
95 | } | ||
96 | } | ||
97 | |||
90 | void Listener::emergencyAbortAllConnections() | 98 | void Listener::emergencyAbortAllConnections() |
91 | { | 99 | { |
92 | Sink::Notification n; | 100 | Sink::Notification n; |
@@ -289,6 +297,9 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
289 | } | 297 | } |
290 | m_exiting = true; | 298 | m_exiting = true; |
291 | } break; | 299 | } break; |
300 | case Sink::Commands::UpgradeCommand: | ||
301 | //Because we synchronously run the update directly on resource start, we know that the upgrade is complete once this message completes. | ||
302 | break; | ||
292 | default: | 303 | default: |
293 | if (commandId > Sink::Commands::CustomCommand) { | 304 | if (commandId > Sink::Commands::CustomCommand) { |
294 | SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId; | 305 | SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId; |
diff --git a/common/listener.h b/common/listener.h index f29130d..38e87da 100644 --- a/common/listener.h +++ b/common/listener.h | |||
@@ -59,6 +59,8 @@ public: | |||
59 | Listener(const QByteArray &resourceName, const QByteArray &resourceType, QObject *parent = 0); | 59 | Listener(const QByteArray &resourceName, const QByteArray &resourceType, QObject *parent = 0); |
60 | ~Listener(); | 60 | ~Listener(); |
61 | 61 | ||
62 | void checkForUpgrade(); | ||
63 | |||
62 | signals: | 64 | signals: |
63 | void noClients(); | 65 | void noClients(); |
64 | 66 | ||
diff --git a/common/resource.cpp b/common/resource.cpp index 78cc51b..96aaded 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -56,6 +56,11 @@ void Resource::setSecret(const QString &s) | |||
56 | Q_UNUSED(s) | 56 | Q_UNUSED(s) |
57 | } | 57 | } |
58 | 58 | ||
59 | bool Resource::checkForUpgrade() | ||
60 | { | ||
61 | return false; | ||
62 | } | ||
63 | |||
59 | 64 | ||
60 | class ResourceFactory::Private | 65 | class ResourceFactory::Private |
61 | { | 66 | { |
diff --git a/common/resource.h b/common/resource.h index 938f267..1b0b388 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -48,6 +48,7 @@ public: | |||
48 | virtual void setLowerBoundRevision(qint64 revision); | 48 | virtual void setLowerBoundRevision(qint64 revision); |
49 | 49 | ||
50 | virtual void setSecret(const QString &s); | 50 | virtual void setSecret(const QString &s); |
51 | virtual bool checkForUpgrade(); | ||
51 | 52 | ||
52 | signals: | 53 | signals: |
53 | void revisionUpdated(qint64); | 54 | void revisionUpdated(qint64); |
diff --git a/common/storage.h b/common/storage.h index 1967a5e..f5f1879 100644 --- a/common/storage.h +++ b/common/storage.h | |||
@@ -235,6 +235,9 @@ public: | |||
235 | 235 | ||
236 | static QByteArray generateUid(); | 236 | static QByteArray generateUid(); |
237 | 237 | ||
238 | static qint64 databaseVersion(const Transaction &); | ||
239 | static void setDatabaseVersion(Transaction &, qint64 revision); | ||
240 | |||
238 | private: | 241 | private: |
239 | std::function<void(const DataStore::Error &error)> mErrorHandler; | 242 | std::function<void(const DataStore::Error &error)> mErrorHandler; |
240 | 243 | ||
diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 630dae9..830eff2 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp | |||
@@ -233,9 +233,31 @@ bool DataStore::NamedDatabase::contains(const QByteArray &uid) | |||
233 | found = true; | 233 | found = true; |
234 | return false; | 234 | return false; |
235 | }, | 235 | }, |
236 | [this](const DataStore::Error &error) {}, true); | 236 | [](const DataStore::Error &error) {}, true); |
237 | return found; | 237 | return found; |
238 | } | 238 | } |
239 | 239 | ||
240 | void DataStore::setDatabaseVersion(DataStore::Transaction &transaction, qint64 revision) | ||
241 | { | ||
242 | transaction.openDatabase().write("__internal_databaseVersion", QByteArray::number(revision)); | ||
243 | } | ||
244 | |||
245 | qint64 DataStore::databaseVersion(const DataStore::Transaction &transaction) | ||
246 | { | ||
247 | qint64 r = 0; | ||
248 | transaction.openDatabase().scan("__internal_databaseVersion", | ||
249 | [&](const QByteArray &, const QByteArray &revision) -> bool { | ||
250 | r = revision.toLongLong(); | ||
251 | return false; | ||
252 | }, | ||
253 | [](const Error &error) { | ||
254 | if (error.code != DataStore::NotFound) { | ||
255 | SinkWarning() << "Couldn't find the database version: " << error; | ||
256 | } | ||
257 | }); | ||
258 | return r; | ||
259 | } | ||
260 | |||
261 | |||
240 | } | 262 | } |
241 | } // namespace Sink | 263 | } // namespace Sink |
diff --git a/common/store.cpp b/common/store.cpp index b16fa4e..2fa62d6 100644 --- a/common/store.cpp +++ b/common/store.cpp | |||
@@ -302,13 +302,33 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) | |||
302 | }); | 302 | }); |
303 | } | 303 | } |
304 | 304 | ||
305 | static KAsync::Job<void> upgrade(const QByteArray &resource) | ||
306 | { | ||
307 | SinkLog() << "Upgrading " << resource; | ||
308 | auto store = Sink::Storage::DataStore(Sink::storageLocation(), resource, Sink::Storage::DataStore::ReadOnly); | ||
309 | if (Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly)) >= Sink::latestDatabaseVersion()) { | ||
310 | return KAsync::null(); | ||
311 | } | ||
312 | |||
313 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); | ||
314 | return resourceAccess->sendCommand(Sink::Commands::UpgradeCommand) | ||
315 | .addToContext(resourceAccess) | ||
316 | .then([=](const KAsync::Error &error) { | ||
317 | if (error) { | ||
318 | SinkWarning() << "Error during upgrade."; | ||
319 | return KAsync::error(error); | ||
320 | } | ||
321 | SinkTrace() << "Upgrade of resource " << resource << " complete."; | ||
322 | return KAsync::null(); | ||
323 | }); | ||
324 | } | ||
325 | |||
305 | KAsync::Job<void> Store::upgrade() | 326 | KAsync::Job<void> Store::upgrade() |
306 | { | 327 | { |
307 | SinkLog() << "Upgrading..."; | 328 | SinkLog() << "Upgrading..."; |
308 | return fetchAll<ApplicationDomain::SinkResource>({}) | 329 | return fetchAll<ApplicationDomain::SinkResource>({}) |
309 | .template each([](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job<void> { | 330 | .template each([](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job<void> { |
310 | SinkLog() << "Removing caches for " << resource->identifier(); | 331 | return Sink::upgrade(resource->identifier()); |
311 | return removeDataFromDisk(resource->identifier()); | ||
312 | }) | 332 | }) |
313 | .then([] { | 333 | .then([] { |
314 | SinkLog() << "Upgrade complete."; | 334 | SinkLog() << "Upgrade complete."; |