summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-16 14:55:20 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-21 09:02:21 +0200
commit237b9ae4113e7a9f489632296941becb71afdb45 (patch)
tree01cde58f495944f01cad9d282391d4efd2897141 /common/genericresource.cpp
parent95d11bf0be98a4e3c08502fe23417b800233ce14 (diff)
downloadsink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz
sink-237b9ae4113e7a9f489632296941becb71afdb45.zip
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp33
1 files changed, 16 insertions, 17 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index ef6edc8..e0d395a 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -45,6 +45,7 @@ static int sBatchSize = 100;
45static int sCommitInterval = 10; 45static int sCommitInterval = 10;
46 46
47using namespace Sink; 47using namespace Sink;
48using namespace Sink::Storage;
48 49
49/** 50/**
50 * Drives the pipeline using the output from all command queues 51 * Drives the pipeline using the output from all command queues
@@ -58,7 +59,7 @@ class CommandProcessor : public QObject
58public: 59public:
59 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) 60 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false)
60 { 61 {
61 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 62 mLowerBoundRevision = DataStore::maxRevision(mPipeline->storage().createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
62 SinkWarning() << error.message; 63 SinkWarning() << error.message;
63 })); 64 }));
64 65
@@ -226,17 +227,15 @@ private:
226 InspectionFunction mInspect; 227 InspectionFunction mInspect;
227}; 228};
228 229
229GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) 230GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline )
230 : Sink::Resource(), 231 : Sink::Resource(),
231 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 232 mResourceContext(resourceContext),
232 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), 233 mUserQueue(Sink::storageLocation(), resourceContext.instanceId() + ".userqueue"),
233 mResourceType(resourceType), 234 mSynchronizerQueue(Sink::storageLocation(), resourceContext.instanceId() + ".synchronizerqueue"),
234 mResourceInstanceIdentifier(resourceInstanceIdentifier), 235 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)),
235 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)),
236 mError(0), 236 mError(0),
237 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 237 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
238{ 238{
239 mPipeline->setResourceType(mResourceType);
240 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); 239 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue));
241 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 240 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
242 flatbuffers::Verifier verifier((const uint8_t *)command, size); 241 flatbuffers::Verifier verifier((const uint8_t *)command, size);
@@ -357,19 +356,19 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan
357 356
358void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) 357void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
359{ 358{
360 Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk(); 359 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk();
361 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk(); 360 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
362 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk(); 361 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
363 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk(); 362 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
364 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::ReadWrite).removeFromDisk(); 363 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
365} 364}
366 365
367qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) 366qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier)
368{ 367{
369 auto size = Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadOnly).diskUsage(); 368 auto size = Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadOnly).diskUsage();
370 size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadOnly).diskUsage(); 369 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadOnly).diskUsage();
371 size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadOnly).diskUsage(); 370 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadOnly).diskUsage();
372 size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadOnly).diskUsage(); 371 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadOnly).diskUsage();
373 return size; 372 return size;
374} 373}
375 374