summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2018-01-30 11:27:33 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2018-01-30 15:28:55 +0100
commit33029ab13cdb378e2b4a0886a591fb02a5cb2b65 (patch)
tree1ffffda4f125fd4989856a9610d5843f5d097135 /common
parentf675a280ad48a9a2ba7b38f81cf0dfdafb3a96b5 (diff)
downloadsink-33029ab13cdb378e2b4a0886a591fb02a5cb2b65.tar.gz
sink-33029ab13cdb378e2b4a0886a591fb02a5cb2b65.zip
Support for storage upgrades
Diffstat (limited to 'common')
-rw-r--r--common/commands.cpp4
-rw-r--r--common/commands.h1
-rw-r--r--common/definitions.cpp6
-rw-r--r--common/definitions.h1
-rw-r--r--common/genericresource.cpp27
-rw-r--r--common/genericresource.h1
-rw-r--r--common/listener.cpp11
-rw-r--r--common/listener.h2
-rw-r--r--common/resource.cpp5
-rw-r--r--common/resource.h1
-rw-r--r--common/storage.h3
-rw-r--r--common/storage_common.cpp24
-rw-r--r--common/store.cpp24
13 files changed, 107 insertions, 3 deletions
diff --git a/common/commands.cpp b/common/commands.cpp
index 24f2017..a92b455 100644
--- a/common/commands.cpp
+++ b/common/commands.cpp
@@ -63,6 +63,10 @@ QByteArray name(int commandId)
63 return "RemoveFromDisk"; 63 return "RemoveFromDisk";
64 case FlushCommand: 64 case FlushCommand:
65 return "Flush"; 65 return "Flush";
66 case SecretCommand:
67 return "Secret";
68 case UpgradeCommand:
69 return "Upgrade";
66 case CustomCommand: 70 case CustomCommand:
67 return "Custom"; 71 return "Custom";
68 }; 72 };
diff --git a/common/commands.h b/common/commands.h
index 8ced268..9ca92a3 100644
--- a/common/commands.h
+++ b/common/commands.h
@@ -49,6 +49,7 @@ enum CommandIds
49 RemoveFromDiskCommand, 49 RemoveFromDiskCommand,
50 FlushCommand, 50 FlushCommand,
51 SecretCommand, 51 SecretCommand,
52 UpgradeCommand,
52 CustomCommand = 0xffff 53 CustomCommand = 0xffff
53}; 54};
54 55
diff --git a/common/definitions.cpp b/common/definitions.cpp
index ee18d52..1f4c0cf 100644
--- a/common/definitions.cpp
+++ b/common/definitions.cpp
@@ -86,3 +86,9 @@ QString Sink::resourceStorageLocation(const QByteArray &resourceInstanceIdentifi
86{ 86{
87 return storageLocation() + "/" + resourceInstanceIdentifier + "/data"; 87 return storageLocation() + "/" + resourceInstanceIdentifier + "/data";
88} 88}
89
90
91qint64 Sink::latestDatabaseVersion()
92{
93 return 0;
94}
diff --git a/common/definitions.h b/common/definitions.h
index 7ef215b..ce424eb 100644
--- a/common/definitions.h
+++ b/common/definitions.h
@@ -30,6 +30,7 @@ QString SINK_EXPORT dataLocation();
30QString SINK_EXPORT configLocation(); 30QString SINK_EXPORT configLocation();
31QString SINK_EXPORT temporaryFileLocation(); 31QString SINK_EXPORT temporaryFileLocation();
32QString SINK_EXPORT resourceStorageLocation(const QByteArray &resourceInstanceIdentifier); 32QString SINK_EXPORT resourceStorageLocation(const QByteArray &resourceInstanceIdentifier);
33qint64 SINK_EXPORT latestDatabaseVersion();
33 34
34/** 35/**
35 * Clear the location cache and lookup locations again. 36 * Clear the location cache and lookup locations again.
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 00d7d0c..8d27e04 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -56,6 +56,33 @@ void GenericResource::setSecret(const QString &s)
56 } 56 }
57} 57}
58 58
59bool GenericResource::checkForUpgrade()
60{
61 const auto currentDatabaseVersion = [&] {
62 auto store = Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly);
63 return Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly));
64 }();
65 if (currentDatabaseVersion < Sink::latestDatabaseVersion()) {
66 SinkLog() << "Starting database upgrade from " << currentDatabaseVersion << " to " << Sink::latestDatabaseVersion();
67
68 //Right now upgrading just means removing all local storage so we will resync
69 Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadWrite).removeFromDisk();
70 Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
71 Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
72 Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
73 Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
74
75 {
76 auto store = Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadWrite);
77 auto t = store.createTransaction(Storage::DataStore::ReadWrite);
78 Storage::DataStore::setDatabaseVersion(t, Sink::latestDatabaseVersion());
79 }
80 SinkLog() << "Finished database upgrade to " << Sink::latestDatabaseVersion();
81 return true;
82 }
83 return false;
84}
85
59void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) 86void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors)
60{ 87{
61 mPipeline->setPreprocessors(type, preprocessors); 88 mPipeline->setPreprocessors(type, preprocessors);
diff --git a/common/genericresource.h b/common/genericresource.h
index edcd7d2..11ede0c 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -48,6 +48,7 @@ public:
48 static qint64 diskUsage(const QByteArray &instanceIdentifier); 48 static qint64 diskUsage(const QByteArray &instanceIdentifier);
49 49
50 virtual void setSecret(const QString &s) Q_DECL_OVERRIDE; 50 virtual void setSecret(const QString &s) Q_DECL_OVERRIDE;
51 virtual bool checkForUpgrade() Q_DECL_OVERRIDE;
51 52
52 //TODO Remove this API, it's only used in tests 53 //TODO Remove this API, it's only used in tests
53 KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query); 54 KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query);
diff --git a/common/listener.cpp b/common/listener.cpp
index c6747cd..9922c48 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -87,6 +87,14 @@ Listener::~Listener()
87 closeAllConnections(); 87 closeAllConnections();
88} 88}
89 89
90void Listener::checkForUpgrade()
91{
92 if (loadResource().checkForUpgrade()) {
93 //Close the resource to ensure no transactions are open
94 m_resource.reset(nullptr);
95 }
96}
97
90void Listener::emergencyAbortAllConnections() 98void Listener::emergencyAbortAllConnections()
91{ 99{
92 Sink::Notification n; 100 Sink::Notification n;
@@ -289,6 +297,9 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
289 } 297 }
290 m_exiting = true; 298 m_exiting = true;
291 } break; 299 } break;
300 case Sink::Commands::UpgradeCommand:
301 //Because we synchronously run the update directly on resource start, we know that the upgrade is complete once this message completes.
302 break;
292 default: 303 default:
293 if (commandId > Sink::Commands::CustomCommand) { 304 if (commandId > Sink::Commands::CustomCommand) {
294 SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId; 305 SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId;
diff --git a/common/listener.h b/common/listener.h
index f29130d..38e87da 100644
--- a/common/listener.h
+++ b/common/listener.h
@@ -59,6 +59,8 @@ public:
59 Listener(const QByteArray &resourceName, const QByteArray &resourceType, QObject *parent = 0); 59 Listener(const QByteArray &resourceName, const QByteArray &resourceType, QObject *parent = 0);
60 ~Listener(); 60 ~Listener();
61 61
62 void checkForUpgrade();
63
62signals: 64signals:
63 void noClients(); 65 void noClients();
64 66
diff --git a/common/resource.cpp b/common/resource.cpp
index 78cc51b..96aaded 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -56,6 +56,11 @@ void Resource::setSecret(const QString &s)
56 Q_UNUSED(s) 56 Q_UNUSED(s)
57} 57}
58 58
59bool Resource::checkForUpgrade()
60{
61 return false;
62}
63
59 64
60class ResourceFactory::Private 65class ResourceFactory::Private
61{ 66{
diff --git a/common/resource.h b/common/resource.h
index 938f267..1b0b388 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -48,6 +48,7 @@ public:
48 virtual void setLowerBoundRevision(qint64 revision); 48 virtual void setLowerBoundRevision(qint64 revision);
49 49
50 virtual void setSecret(const QString &s); 50 virtual void setSecret(const QString &s);
51 virtual bool checkForUpgrade();
51 52
52signals: 53signals:
53 void revisionUpdated(qint64); 54 void revisionUpdated(qint64);
diff --git a/common/storage.h b/common/storage.h
index 1967a5e..f5f1879 100644
--- a/common/storage.h
+++ b/common/storage.h
@@ -235,6 +235,9 @@ public:
235 235
236 static QByteArray generateUid(); 236 static QByteArray generateUid();
237 237
238 static qint64 databaseVersion(const Transaction &);
239 static void setDatabaseVersion(Transaction &, qint64 revision);
240
238private: 241private:
239 std::function<void(const DataStore::Error &error)> mErrorHandler; 242 std::function<void(const DataStore::Error &error)> mErrorHandler;
240 243
diff --git a/common/storage_common.cpp b/common/storage_common.cpp
index 630dae9..830eff2 100644
--- a/common/storage_common.cpp
+++ b/common/storage_common.cpp
@@ -233,9 +233,31 @@ bool DataStore::NamedDatabase::contains(const QByteArray &uid)
233 found = true; 233 found = true;
234 return false; 234 return false;
235 }, 235 },
236 [this](const DataStore::Error &error) {}, true); 236 [](const DataStore::Error &error) {}, true);
237 return found; 237 return found;
238} 238}
239 239
240void DataStore::setDatabaseVersion(DataStore::Transaction &transaction, qint64 revision)
241{
242 transaction.openDatabase().write("__internal_databaseVersion", QByteArray::number(revision));
243}
244
245qint64 DataStore::databaseVersion(const DataStore::Transaction &transaction)
246{
247 qint64 r = 0;
248 transaction.openDatabase().scan("__internal_databaseVersion",
249 [&](const QByteArray &, const QByteArray &revision) -> bool {
250 r = revision.toLongLong();
251 return false;
252 },
253 [](const Error &error) {
254 if (error.code != DataStore::NotFound) {
255 SinkWarning() << "Couldn't find the database version: " << error;
256 }
257 });
258 return r;
259}
260
261
240} 262}
241} // namespace Sink 263} // namespace Sink
diff --git a/common/store.cpp b/common/store.cpp
index b16fa4e..2fa62d6 100644
--- a/common/store.cpp
+++ b/common/store.cpp
@@ -302,13 +302,33 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
302 }); 302 });
303} 303}
304 304
305static KAsync::Job<void> upgrade(const QByteArray &resource)
306{
307 SinkLog() << "Upgrading " << resource;
308 auto store = Sink::Storage::DataStore(Sink::storageLocation(), resource, Sink::Storage::DataStore::ReadOnly);
309 if (Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly)) >= Sink::latestDatabaseVersion()) {
310 return KAsync::null();
311 }
312
313 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource));
314 return resourceAccess->sendCommand(Sink::Commands::UpgradeCommand)
315 .addToContext(resourceAccess)
316 .then([=](const KAsync::Error &error) {
317 if (error) {
318 SinkWarning() << "Error during upgrade.";
319 return KAsync::error(error);
320 }
321 SinkTrace() << "Upgrade of resource " << resource << " complete.";
322 return KAsync::null();
323 });
324}
325
305KAsync::Job<void> Store::upgrade() 326KAsync::Job<void> Store::upgrade()
306{ 327{
307 SinkLog() << "Upgrading..."; 328 SinkLog() << "Upgrading...";
308 return fetchAll<ApplicationDomain::SinkResource>({}) 329 return fetchAll<ApplicationDomain::SinkResource>({})
309 .template each([](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job<void> { 330 .template each([](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job<void> {
310 SinkLog() << "Removing caches for " << resource->identifier(); 331 return Sink::upgrade(resource->identifier());
311 return removeDataFromDisk(resource->identifier());
312 }) 332 })
313 .then([] { 333 .then([] {
314 SinkLog() << "Upgrade complete."; 334 SinkLog() << "Upgrade complete.";