diff options
-rw-r--r-- | common/genericresource.cpp | 5 | ||||
-rw-r--r-- | common/genericresource.h | 2 | ||||
-rw-r--r-- | common/listener.cpp | 48 | ||||
-rw-r--r-- | common/listener.h | 10 | ||||
-rw-r--r-- | common/pipeline.cpp | 17 | ||||
-rw-r--r-- | common/pipeline.h | 4 |
6 files changed, 38 insertions, 48 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 2b9e7b2..ed7dd46 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -240,7 +240,7 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra | |||
240 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 240 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
241 | { | 241 | { |
242 | mPipeline->setResourceType(mResourceType); | 242 | mPipeline->setResourceType(mResourceType); |
243 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); | 243 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); |
244 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 244 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
245 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 245 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
246 | if (Sink::Commands::VerifyInspectionBuffer(verifier)) { | 246 | if (Sink::Commands::VerifyInspectionBuffer(verifier)) { |
@@ -280,7 +280,7 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra | |||
280 | return KAsync::error<void>(-1, "Invalid inspection command."); | 280 | return KAsync::error<void>(-1, "Invalid inspection command."); |
281 | }); | 281 | }); |
282 | { | 282 | { |
283 | auto ret =QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 283 | auto ret =QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
284 | Q_ASSERT(ret); | 284 | Q_ASSERT(ret); |
285 | } | 285 | } |
286 | { | 286 | { |
@@ -296,7 +296,6 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra | |||
296 | 296 | ||
297 | GenericResource::~GenericResource() | 297 | GenericResource::~GenericResource() |
298 | { | 298 | { |
299 | delete mProcessor; | ||
300 | } | 299 | } |
301 | 300 | ||
302 | KAsync::Job<void> GenericResource::inspect( | 301 | KAsync::Job<void> GenericResource::inspect( |
diff --git a/common/genericresource.h b/common/genericresource.h index 0878968..2254172 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -77,7 +77,7 @@ protected: | |||
77 | QSharedPointer<Pipeline> mPipeline; | 77 | QSharedPointer<Pipeline> mPipeline; |
78 | 78 | ||
79 | private: | 79 | private: |
80 | CommandProcessor *mProcessor; | 80 | std::unique_ptr<CommandProcessor> mProcessor; |
81 | QSharedPointer<ChangeReplay> mChangeReplay; | 81 | QSharedPointer<ChangeReplay> mChangeReplay; |
82 | QSharedPointer<Synchronizer> mSynchronizer; | 82 | QSharedPointer<Synchronizer> mSynchronizer; |
83 | int mError; | 83 | int mError; |
diff --git a/common/listener.cpp b/common/listener.cpp index 32c57ac..af8eaa2 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -47,11 +47,10 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra | |||
47 | m_server(new QLocalServer(this)), | 47 | m_server(new QLocalServer(this)), |
48 | m_resourceName(resourceType), | 48 | m_resourceName(resourceType), |
49 | m_resourceInstanceIdentifier(resourceInstanceIdentifier), | 49 | m_resourceInstanceIdentifier(resourceInstanceIdentifier), |
50 | m_resource(0), | ||
51 | m_clientBufferProcessesTimer(new QTimer(this)), | 50 | m_clientBufferProcessesTimer(new QTimer(this)), |
52 | m_messageId(0) | 51 | m_messageId(0) |
53 | { | 52 | { |
54 | connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); | 53 | connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection); |
55 | Trace() << "Trying to open " << m_resourceInstanceIdentifier; | 54 | Trace() << "Trying to open " << m_resourceInstanceIdentifier; |
56 | 55 | ||
57 | if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { | 56 | if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { |
@@ -66,10 +65,10 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra | |||
66 | Log() << QString("Listening on %1").arg(m_server->serverName()); | 65 | Log() << QString("Listening on %1").arg(m_server->serverName()); |
67 | } | 66 | } |
68 | 67 | ||
69 | m_checkConnectionsTimer = new QTimer; | 68 | m_checkConnectionsTimer = std::unique_ptr<QTimer>(new QTimer); |
70 | m_checkConnectionsTimer->setSingleShot(true); | 69 | m_checkConnectionsTimer->setSingleShot(true); |
71 | m_checkConnectionsTimer->setInterval(1000); | 70 | m_checkConnectionsTimer->setInterval(1000); |
72 | connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { | 71 | connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() { |
73 | if (m_connections.isEmpty()) { | 72 | if (m_connections.isEmpty()) { |
74 | Log() << QString("No connections, shutting down."); | 73 | Log() << QString("No connections, shutting down."); |
75 | quit(); | 74 | quit(); |
@@ -80,16 +79,12 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra | |||
80 | // or even just drop down to invoking the method queued? => invoke queued unless we need throttling | 79 | // or even just drop down to invoking the method queued? => invoke queued unless we need throttling |
81 | m_clientBufferProcessesTimer->setInterval(0); | 80 | m_clientBufferProcessesTimer->setInterval(0); |
82 | m_clientBufferProcessesTimer->setSingleShot(true); | 81 | m_clientBufferProcessesTimer->setSingleShot(true); |
83 | connect(m_clientBufferProcessesTimer, &QTimer::timeout, this, &Listener::processClientBuffers); | 82 | connect(m_clientBufferProcessesTimer.get(), &QTimer::timeout, this, &Listener::processClientBuffers); |
84 | } | 83 | } |
85 | 84 | ||
86 | Listener::~Listener() | 85 | Listener::~Listener() |
87 | { | 86 | { |
88 | closeAllConnections(); | 87 | closeAllConnections(); |
89 | delete m_resource; | ||
90 | delete m_checkConnectionsTimer; | ||
91 | delete m_clientBufferProcessesTimer; | ||
92 | delete m_server; | ||
93 | } | 88 | } |
94 | 89 | ||
95 | void Listener::emergencyAbortAllConnections() | 90 | void Listener::emergencyAbortAllConnections() |
@@ -140,7 +135,7 @@ void Listener::acceptConnection() | |||
140 | 135 | ||
141 | // If this is the first client, set the lower limit for revision cleanup | 136 | // If this is the first client, set the lower limit for revision cleanup |
142 | if (m_connections.size() == 1) { | 137 | if (m_connections.size() == 1) { |
143 | loadResource()->setLowerBoundRevision(0); | 138 | loadResource().setLowerBoundRevision(0); |
144 | } | 139 | } |
145 | 140 | ||
146 | if (socket->bytesAvailable()) { | 141 | if (socket->bytesAvailable()) { |
@@ -177,7 +172,7 @@ void Listener::checkConnections() | |||
177 | { | 172 | { |
178 | // If this was the last client, disengage the lower limit for revision cleanup | 173 | // If this was the last client, disengage the lower limit for revision cleanup |
179 | if (m_connections.isEmpty()) { | 174 | if (m_connections.isEmpty()) { |
180 | loadResource()->setLowerBoundRevision(std::numeric_limits<qint64>::max()); | 175 | loadResource().setLowerBoundRevision(std::numeric_limits<qint64>::max()); |
181 | } | 176 | } |
182 | m_checkConnectionsTimer->start(); | 177 | m_checkConnectionsTimer->start(); |
183 | } | 178 | } |
@@ -249,10 +244,10 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
249 | timer->start(); | 244 | timer->start(); |
250 | auto job = KAsync::null<void>(); | 245 | auto job = KAsync::null<void>(); |
251 | if (buffer->sourceSync()) { | 246 | if (buffer->sourceSync()) { |
252 | job = loadResource()->synchronizeWithSource(); | 247 | job = loadResource().synchronizeWithSource(); |
253 | } | 248 | } |
254 | if (buffer->localSync()) { | 249 | if (buffer->localSync()) { |
255 | job = job.then<void>(loadResource()->processAllMessages()); | 250 | job = job.then<void>(loadResource().processAllMessages()); |
256 | } | 251 | } |
257 | job.then<void>([callback, timer]() { | 252 | job.then<void>([callback, timer]() { |
258 | Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); | 253 | Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); |
@@ -274,7 +269,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
274 | case Sink::Commands::ModifyEntityCommand: | 269 | case Sink::Commands::ModifyEntityCommand: |
275 | case Sink::Commands::CreateEntityCommand: | 270 | case Sink::Commands::CreateEntityCommand: |
276 | Trace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; | 271 | Trace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; |
277 | loadResource()->processCommand(commandId, commandBuffer); | 272 | loadResource().processCommand(commandId, commandBuffer); |
278 | break; | 273 | break; |
279 | case Sink::Commands::ShutdownCommand: | 274 | case Sink::Commands::ShutdownCommand: |
280 | Log() << QString("Received shutdown command from %1").arg(client.name); | 275 | Log() << QString("Received shutdown command from %1").arg(client.name); |
@@ -294,20 +289,19 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
294 | } else { | 289 | } else { |
295 | Warning() << "received invalid command"; | 290 | Warning() << "received invalid command"; |
296 | } | 291 | } |
297 | loadResource()->setLowerBoundRevision(lowerBoundRevision()); | 292 | loadResource().setLowerBoundRevision(lowerBoundRevision()); |
298 | } break; | 293 | } break; |
299 | case Sink::Commands::RemoveFromDiskCommand: { | 294 | case Sink::Commands::RemoveFromDiskCommand: { |
300 | Log() << QString("Received a remove from disk command from %1").arg(client.name); | 295 | Log() << QString("Received a remove from disk command from %1").arg(client.name); |
301 | delete m_resource; | 296 | m_resource.reset(nullptr); |
302 | m_resource = nullptr; | 297 | loadResource().removeDataFromDisk(); |
303 | loadResource()->removeDataFromDisk(); | ||
304 | m_server->close(); | 298 | m_server->close(); |
305 | QTimer::singleShot(0, this, &Listener::quit); | 299 | QTimer::singleShot(0, this, &Listener::quit); |
306 | } break; | 300 | } break; |
307 | default: | 301 | default: |
308 | if (commandId > Sink::Commands::CustomCommand) { | 302 | if (commandId > Sink::Commands::CustomCommand) { |
309 | Log() << QString("Received custom command from %1: ").arg(client.name) << commandId; | 303 | Log() << QString("Received custom command from %1: ").arg(client.name) << commandId; |
310 | loadResource()->processCommand(commandId, commandBuffer); | 304 | loadResource().processCommand(commandId, commandBuffer); |
311 | } else { | 305 | } else { |
312 | success = false; | 306 | success = false; |
313 | ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; | 307 | ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; |
@@ -437,25 +431,25 @@ void Listener::notify(const Sink::Notification ¬ification) | |||
437 | m_fbb.Clear(); | 431 | m_fbb.Clear(); |
438 | } | 432 | } |
439 | 433 | ||
440 | Sink::Resource *Listener::loadResource() | 434 | Sink::Resource &Listener::loadResource() |
441 | { | 435 | { |
442 | if (!m_resource) { | 436 | if (!m_resource) { |
443 | if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { | 437 | if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { |
444 | m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); | 438 | m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(m_resourceInstanceIdentifier)); |
445 | if (!m_resource) { | 439 | if (!m_resource) { |
446 | ErrorMsg() << "Failed to instantiate the resource " << m_resourceName; | 440 | ErrorMsg() << "Failed to instantiate the resource " << m_resourceName; |
447 | m_resource = new Sink::Resource; | 441 | m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); |
448 | } | 442 | } |
449 | Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); | 443 | Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); |
450 | Trace() << QString("\tResource: %1").arg((qlonglong)m_resource); | 444 | Trace() << QString("\tResource: %1").arg((qlonglong)m_resource.get()); |
451 | connect(m_resource, &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); | 445 | connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); |
452 | connect(m_resource, &Sink::Resource::notify, this, &Listener::notify); | 446 | connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify); |
453 | } else { | 447 | } else { |
454 | ErrorMsg() << "Failed to load resource " << m_resourceName; | 448 | ErrorMsg() << "Failed to load resource " << m_resourceName; |
455 | m_resource = new Sink::Resource; | 449 | m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); |
456 | } | 450 | } |
457 | } | 451 | } |
458 | return m_resource; | 452 | return *m_resource; |
459 | } | 453 | } |
460 | 454 | ||
461 | #pragma clang diagnostic push | 455 | #pragma clang diagnostic push |
diff --git a/common/listener.h b/common/listener.h index 5e376c7..67d76e9 100644 --- a/common/listener.h +++ b/common/listener.h | |||
@@ -81,17 +81,17 @@ private: | |||
81 | bool processClientBuffer(Client &client); | 81 | bool processClientBuffer(Client &client); |
82 | void sendCommandCompleted(QLocalSocket *socket, uint messageId, bool success); | 82 | void sendCommandCompleted(QLocalSocket *socket, uint messageId, bool success); |
83 | void updateClientsWithRevision(qint64); | 83 | void updateClientsWithRevision(qint64); |
84 | Sink::Resource *loadResource(); | 84 | Sink::Resource &loadResource(); |
85 | void readFromSocket(QLocalSocket *socket); | 85 | void readFromSocket(QLocalSocket *socket); |
86 | qint64 lowerBoundRevision(); | 86 | qint64 lowerBoundRevision(); |
87 | 87 | ||
88 | QLocalServer *m_server; | 88 | std::unique_ptr<QLocalServer> m_server; |
89 | QVector<Client> m_connections; | 89 | QVector<Client> m_connections; |
90 | flatbuffers::FlatBufferBuilder m_fbb; | 90 | flatbuffers::FlatBufferBuilder m_fbb; |
91 | const QByteArray m_resourceName; | 91 | const QByteArray m_resourceName; |
92 | const QByteArray m_resourceInstanceIdentifier; | 92 | const QByteArray m_resourceInstanceIdentifier; |
93 | Sink::Resource *m_resource; | 93 | std::unique_ptr<Sink::Resource> m_resource; |
94 | QTimer *m_clientBufferProcessesTimer; | 94 | std::unique_ptr<QTimer> m_clientBufferProcessesTimer; |
95 | QTimer *m_checkConnectionsTimer; | 95 | std::unique_ptr<QTimer> m_checkConnectionsTimer; |
96 | int m_messageId; | 96 | int m_messageId; |
97 | }; | 97 | }; |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index feceb77..034f913 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -52,7 +52,7 @@ public: | |||
52 | 52 | ||
53 | Storage storage; | 53 | Storage storage; |
54 | Storage::Transaction transaction; | 54 | Storage::Transaction transaction; |
55 | QHash<QString, QVector<Preprocessor *>> processors; | 55 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; |
56 | bool revisionChanged; | 56 | bool revisionChanged; |
57 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 57 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
58 | QTime transactionTime; | 58 | QTime transactionTime; |
@@ -80,18 +80,16 @@ Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(paren | |||
80 | Pipeline::~Pipeline() | 80 | Pipeline::~Pipeline() |
81 | { | 81 | { |
82 | d->transaction = Storage::Transaction(); | 82 | d->transaction = Storage::Transaction(); |
83 | for (const auto &t : d->processors.keys()) { | ||
84 | qDeleteAll(d->processors.value(t)); | ||
85 | } | ||
86 | delete d; | ||
87 | } | 83 | } |
88 | 84 | ||
89 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) | 85 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) |
90 | { | 86 | { |
87 | auto &list = d->processors[entityType]; | ||
88 | list.clear(); | ||
91 | for (auto p : processors) { | 89 | for (auto p : processors) { |
92 | p->setup(d->resourceType, d->resourceInstanceIdentifier, this); | 90 | p->setup(d->resourceType, d->resourceInstanceIdentifier, this); |
91 | list.append(QSharedPointer<Preprocessor>(p)); | ||
93 | } | 92 | } |
94 | d->processors[entityType] = processors; | ||
95 | } | 93 | } |
96 | 94 | ||
97 | void Pipeline::setResourceType(const QByteArray &resourceType) | 95 | void Pipeline::setResourceType(const QByteArray &resourceType) |
@@ -216,7 +214,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
216 | 214 | ||
217 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 215 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
218 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); | 216 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); |
219 | for (auto processor : d->processors[bufferType]) { | 217 | foreach (const auto &processor, d->processors[bufferType]) { |
220 | processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); | 218 | processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); |
221 | } | 219 | } |
222 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 220 | //The maxRevision may have changed meanwhile if the entity created sub-entities |
@@ -325,7 +323,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
325 | } | 323 | } |
326 | 324 | ||
327 | newAdaptor->resetChangedProperties(); | 325 | newAdaptor->resetChangedProperties(); |
328 | for (auto processor : d->processors[bufferType]) { | 326 | foreach (const auto &processor, d->processors[bufferType]) { |
329 | processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); | 327 | processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); |
330 | } | 328 | } |
331 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 329 | //The maxRevision may have changed meanwhile if the entity created sub-entities |
@@ -432,7 +430,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
432 | 430 | ||
433 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 431 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
434 | 432 | ||
435 | for (auto processor : d->processors[bufferType]) { | 433 | foreach (const auto &processor, d->processors[bufferType]) { |
436 | processor->deletedEntity(key, newRevision, *current, d->transaction); | 434 | processor->deletedEntity(key, newRevision, *current, d->transaction); |
437 | } | 435 | } |
438 | 436 | ||
@@ -485,7 +483,6 @@ Preprocessor::Preprocessor() : d(new Preprocessor::Private) | |||
485 | 483 | ||
486 | Preprocessor::~Preprocessor() | 484 | Preprocessor::~Preprocessor() |
487 | { | 485 | { |
488 | delete d; | ||
489 | } | 486 | } |
490 | 487 | ||
491 | void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline) | 488 | void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline) |
diff --git a/common/pipeline.h b/common/pipeline.h index d04d795..ef89cf0 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -72,7 +72,7 @@ signals: | |||
72 | 72 | ||
73 | private: | 73 | private: |
74 | class Private; | 74 | class Private; |
75 | Private *const d; | 75 | const std::unique_ptr<Private> d; |
76 | }; | 76 | }; |
77 | 77 | ||
78 | class SINK_EXPORT Preprocessor | 78 | class SINK_EXPORT Preprocessor |
@@ -103,7 +103,7 @@ protected: | |||
103 | private: | 103 | private: |
104 | friend class Pipeline; | 104 | friend class Pipeline; |
105 | class Private; | 105 | class Private; |
106 | Private *const d; | 106 | const std::unique_ptr<Private> d; |
107 | }; | 107 | }; |
108 | 108 | ||
109 | template<typename DomainType> | 109 | template<typename DomainType> |