diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-16 14:55:20 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-21 09:02:21 +0200 |
commit | 237b9ae4113e7a9f489632296941becb71afdb45 (patch) | |
tree | 01cde58f495944f01cad9d282391d4efd2897141 /common/genericresource.cpp | |
parent | 95d11bf0be98a4e3c08502fe23417b800233ce14 (diff) | |
download | sink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz sink-237b9ae4113e7a9f489632296941becb71afdb45.zip |
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage.
It does a couple of things:
* Rename Sink::Storage to Sink::Storage::DataStore to free up the
Sink::Storage namespace
* Introduce a Sink::ResourceContext to have a single object that can be
passed around containing everything that is necessary to operate on a
resource. This is a lot better than the multiple separate parameters
that we used to pass around all over the place, while still allowing
for dependency injection for tests.
* Tie storage access together using the new EntityStore that directly
works with ApplicationDomainTypes. This gives us a central place where
main storage, indexes and buffer adaptors are tied together, which
will also give us a place to implement external indexes, such as a
fulltextindex using xapian.
* Use ApplicationDomainTypes as the default way to pass around entities.
Instead of using various ways to pass around entities (buffers,
buffer adaptors, ApplicationDomainTypes), only use a single way.
The old approach was confusing, and was only done as:
* optimization; really shouldn't be necessary and otherwise I'm sure
we can find better ways to optimize ApplicationDomainType itself.
* a way to account for entities that have multiple buffers, a concept
that I no longer deem relevant.
While this commit does the bulk of the work to get there, the following
commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 33 |
1 files changed, 16 insertions, 17 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index ef6edc8..e0d395a 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -45,6 +45,7 @@ static int sBatchSize = 100; | |||
45 | static int sCommitInterval = 10; | 45 | static int sCommitInterval = 10; |
46 | 46 | ||
47 | using namespace Sink; | 47 | using namespace Sink; |
48 | using namespace Sink::Storage; | ||
48 | 49 | ||
49 | /** | 50 | /** |
50 | * Drives the pipeline using the output from all command queues | 51 | * Drives the pipeline using the output from all command queues |
@@ -58,7 +59,7 @@ class CommandProcessor : public QObject | |||
58 | public: | 59 | public: |
59 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) | 60 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) |
60 | { | 61 | { |
61 | mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 62 | mLowerBoundRevision = DataStore::maxRevision(mPipeline->storage().createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { |
62 | SinkWarning() << error.message; | 63 | SinkWarning() << error.message; |
63 | })); | 64 | })); |
64 | 65 | ||
@@ -226,17 +227,15 @@ private: | |||
226 | InspectionFunction mInspect; | 227 | InspectionFunction mInspect; |
227 | }; | 228 | }; |
228 | 229 | ||
229 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) | 230 | GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) |
230 | : Sink::Resource(), | 231 | : Sink::Resource(), |
231 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 232 | mResourceContext(resourceContext), |
232 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 233 | mUserQueue(Sink::storageLocation(), resourceContext.instanceId() + ".userqueue"), |
233 | mResourceType(resourceType), | 234 | mSynchronizerQueue(Sink::storageLocation(), resourceContext.instanceId() + ".synchronizerqueue"), |
234 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 235 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)), |
235 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), | ||
236 | mError(0), | 236 | mError(0), |
237 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 237 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
238 | { | 238 | { |
239 | mPipeline->setResourceType(mResourceType); | ||
240 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); | 239 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); |
241 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 240 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
242 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 241 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
@@ -357,19 +356,19 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan | |||
357 | 356 | ||
358 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) | 357 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) |
359 | { | 358 | { |
360 | Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk(); | 359 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); |
361 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk(); | 360 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); |
362 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk(); | 361 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); |
363 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk(); | 362 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); |
364 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::ReadWrite).removeFromDisk(); | 363 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); |
365 | } | 364 | } |
366 | 365 | ||
367 | qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) | 366 | qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) |
368 | { | 367 | { |
369 | auto size = Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadOnly).diskUsage(); | 368 | auto size = Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadOnly).diskUsage(); |
370 | size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadOnly).diskUsage(); | 369 | size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadOnly).diskUsage(); |
371 | size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadOnly).diskUsage(); | 370 | size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadOnly).diskUsage(); |
372 | size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadOnly).diskUsage(); | 371 | size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadOnly).diskUsage(); |
373 | return size; | 372 | return size; |
374 | } | 373 | } |
375 | 374 | ||