diff options
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 33 |
1 files changed, 16 insertions, 17 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index ef6edc8..e0d395a 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -45,6 +45,7 @@ static int sBatchSize = 100; | |||
45 | static int sCommitInterval = 10; | 45 | static int sCommitInterval = 10; |
46 | 46 | ||
47 | using namespace Sink; | 47 | using namespace Sink; |
48 | using namespace Sink::Storage; | ||
48 | 49 | ||
49 | /** | 50 | /** |
50 | * Drives the pipeline using the output from all command queues | 51 | * Drives the pipeline using the output from all command queues |
@@ -58,7 +59,7 @@ class CommandProcessor : public QObject | |||
58 | public: | 59 | public: |
59 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) | 60 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) |
60 | { | 61 | { |
61 | mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 62 | mLowerBoundRevision = DataStore::maxRevision(mPipeline->storage().createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { |
62 | SinkWarning() << error.message; | 63 | SinkWarning() << error.message; |
63 | })); | 64 | })); |
64 | 65 | ||
@@ -226,17 +227,15 @@ private: | |||
226 | InspectionFunction mInspect; | 227 | InspectionFunction mInspect; |
227 | }; | 228 | }; |
228 | 229 | ||
229 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) | 230 | GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) |
230 | : Sink::Resource(), | 231 | : Sink::Resource(), |
231 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 232 | mResourceContext(resourceContext), |
232 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 233 | mUserQueue(Sink::storageLocation(), resourceContext.instanceId() + ".userqueue"), |
233 | mResourceType(resourceType), | 234 | mSynchronizerQueue(Sink::storageLocation(), resourceContext.instanceId() + ".synchronizerqueue"), |
234 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 235 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)), |
235 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), | ||
236 | mError(0), | 236 | mError(0), |
237 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 237 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
238 | { | 238 | { |
239 | mPipeline->setResourceType(mResourceType); | ||
240 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); | 239 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); |
241 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 240 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
242 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 241 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
@@ -357,19 +356,19 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan | |||
357 | 356 | ||
358 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) | 357 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) |
359 | { | 358 | { |
360 | Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk(); | 359 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); |
361 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk(); | 360 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); |
362 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk(); | 361 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); |
363 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk(); | 362 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); |
364 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::ReadWrite).removeFromDisk(); | 363 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); |
365 | } | 364 | } |
366 | 365 | ||
367 | qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) | 366 | qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) |
368 | { | 367 | { |
369 | auto size = Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadOnly).diskUsage(); | 368 | auto size = Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadOnly).diskUsage(); |
370 | size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadOnly).diskUsage(); | 369 | size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadOnly).diskUsage(); |
371 | size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadOnly).diskUsage(); | 370 | size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadOnly).diskUsage(); |
372 | size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadOnly).diskUsage(); | 371 | size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadOnly).diskUsage(); |
373 | return size; | 372 | return size; |
374 | } | 373 | } |
375 | 374 | ||