summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp33
1 files changed, 16 insertions, 17 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index ef6edc8..e0d395a 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -45,6 +45,7 @@ static int sBatchSize = 100;
45static int sCommitInterval = 10; 45static int sCommitInterval = 10;
46 46
47using namespace Sink; 47using namespace Sink;
48using namespace Sink::Storage;
48 49
49/** 50/**
50 * Drives the pipeline using the output from all command queues 51 * Drives the pipeline using the output from all command queues
@@ -58,7 +59,7 @@ class CommandProcessor : public QObject
58public: 59public:
59 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) 60 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false)
60 { 61 {
61 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 62 mLowerBoundRevision = DataStore::maxRevision(mPipeline->storage().createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
62 SinkWarning() << error.message; 63 SinkWarning() << error.message;
63 })); 64 }));
64 65
@@ -226,17 +227,15 @@ private:
226 InspectionFunction mInspect; 227 InspectionFunction mInspect;
227}; 228};
228 229
229GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) 230GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline )
230 : Sink::Resource(), 231 : Sink::Resource(),
231 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 232 mResourceContext(resourceContext),
232 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), 233 mUserQueue(Sink::storageLocation(), resourceContext.instanceId() + ".userqueue"),
233 mResourceType(resourceType), 234 mSynchronizerQueue(Sink::storageLocation(), resourceContext.instanceId() + ".synchronizerqueue"),
234 mResourceInstanceIdentifier(resourceInstanceIdentifier), 235 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)),
235 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)),
236 mError(0), 236 mError(0),
237 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 237 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
238{ 238{
239 mPipeline->setResourceType(mResourceType);
240 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); 239 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue));
241 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 240 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
242 flatbuffers::Verifier verifier((const uint8_t *)command, size); 241 flatbuffers::Verifier verifier((const uint8_t *)command, size);
@@ -357,19 +356,19 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan
357 356
358void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) 357void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
359{ 358{
360 Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk(); 359 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk();
361 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk(); 360 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
362 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk(); 361 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
363 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk(); 362 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
364 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::ReadWrite).removeFromDisk(); 363 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
365} 364}
366 365
367qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) 366qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier)
368{ 367{
369 auto size = Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadOnly).diskUsage(); 368 auto size = Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadOnly).diskUsage();
370 size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadOnly).diskUsage(); 369 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadOnly).diskUsage();
371 size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadOnly).diskUsage(); 370 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadOnly).diskUsage();
372 size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadOnly).diskUsage(); 371 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadOnly).diskUsage();
373 return size; 372 return size;
374} 373}
375 374