summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/genericresource.cpp5
-rw-r--r--common/genericresource.h2
-rw-r--r--common/listener.cpp48
-rw-r--r--common/listener.h10
-rw-r--r--common/pipeline.cpp17
-rw-r--r--common/pipeline.h4
6 files changed, 38 insertions, 48 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 2b9e7b2..ed7dd46 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -240,7 +240,7 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
240 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 240 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
241{ 241{
242 mPipeline->setResourceType(mResourceType); 242 mPipeline->setResourceType(mResourceType);
243 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); 243 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue));
244 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 244 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
245 flatbuffers::Verifier verifier((const uint8_t *)command, size); 245 flatbuffers::Verifier verifier((const uint8_t *)command, size);
246 if (Sink::Commands::VerifyInspectionBuffer(verifier)) { 246 if (Sink::Commands::VerifyInspectionBuffer(verifier)) {
@@ -280,7 +280,7 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
280 return KAsync::error<void>(-1, "Invalid inspection command."); 280 return KAsync::error<void>(-1, "Invalid inspection command.");
281 }); 281 });
282 { 282 {
283 auto ret =QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 283 auto ret =QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
284 Q_ASSERT(ret); 284 Q_ASSERT(ret);
285 } 285 }
286 { 286 {
@@ -296,7 +296,6 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
296 296
297GenericResource::~GenericResource() 297GenericResource::~GenericResource()
298{ 298{
299 delete mProcessor;
300} 299}
301 300
302KAsync::Job<void> GenericResource::inspect( 301KAsync::Job<void> GenericResource::inspect(
diff --git a/common/genericresource.h b/common/genericresource.h
index 0878968..2254172 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -77,7 +77,7 @@ protected:
77 QSharedPointer<Pipeline> mPipeline; 77 QSharedPointer<Pipeline> mPipeline;
78 78
79private: 79private:
80 CommandProcessor *mProcessor; 80 std::unique_ptr<CommandProcessor> mProcessor;
81 QSharedPointer<ChangeReplay> mChangeReplay; 81 QSharedPointer<ChangeReplay> mChangeReplay;
82 QSharedPointer<Synchronizer> mSynchronizer; 82 QSharedPointer<Synchronizer> mSynchronizer;
83 int mError; 83 int mError;
diff --git a/common/listener.cpp b/common/listener.cpp
index 32c57ac..af8eaa2 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -47,11 +47,10 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra
47 m_server(new QLocalServer(this)), 47 m_server(new QLocalServer(this)),
48 m_resourceName(resourceType), 48 m_resourceName(resourceType),
49 m_resourceInstanceIdentifier(resourceInstanceIdentifier), 49 m_resourceInstanceIdentifier(resourceInstanceIdentifier),
50 m_resource(0),
51 m_clientBufferProcessesTimer(new QTimer(this)), 50 m_clientBufferProcessesTimer(new QTimer(this)),
52 m_messageId(0) 51 m_messageId(0)
53{ 52{
54 connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); 53 connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection);
55 Trace() << "Trying to open " << m_resourceInstanceIdentifier; 54 Trace() << "Trying to open " << m_resourceInstanceIdentifier;
56 55
57 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { 56 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
@@ -66,10 +65,10 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra
66 Log() << QString("Listening on %1").arg(m_server->serverName()); 65 Log() << QString("Listening on %1").arg(m_server->serverName());
67 } 66 }
68 67
69 m_checkConnectionsTimer = new QTimer; 68 m_checkConnectionsTimer = std::unique_ptr<QTimer>(new QTimer);
70 m_checkConnectionsTimer->setSingleShot(true); 69 m_checkConnectionsTimer->setSingleShot(true);
71 m_checkConnectionsTimer->setInterval(1000); 70 m_checkConnectionsTimer->setInterval(1000);
72 connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { 71 connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() {
73 if (m_connections.isEmpty()) { 72 if (m_connections.isEmpty()) {
74 Log() << QString("No connections, shutting down."); 73 Log() << QString("No connections, shutting down.");
75 quit(); 74 quit();
@@ -80,16 +79,12 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra
80 // or even just drop down to invoking the method queued? => invoke queued unless we need throttling 79 // or even just drop down to invoking the method queued? => invoke queued unless we need throttling
81 m_clientBufferProcessesTimer->setInterval(0); 80 m_clientBufferProcessesTimer->setInterval(0);
82 m_clientBufferProcessesTimer->setSingleShot(true); 81 m_clientBufferProcessesTimer->setSingleShot(true);
83 connect(m_clientBufferProcessesTimer, &QTimer::timeout, this, &Listener::processClientBuffers); 82 connect(m_clientBufferProcessesTimer.get(), &QTimer::timeout, this, &Listener::processClientBuffers);
84} 83}
85 84
86Listener::~Listener() 85Listener::~Listener()
87{ 86{
88 closeAllConnections(); 87 closeAllConnections();
89 delete m_resource;
90 delete m_checkConnectionsTimer;
91 delete m_clientBufferProcessesTimer;
92 delete m_server;
93} 88}
94 89
95void Listener::emergencyAbortAllConnections() 90void Listener::emergencyAbortAllConnections()
@@ -140,7 +135,7 @@ void Listener::acceptConnection()
140 135
141 // If this is the first client, set the lower limit for revision cleanup 136 // If this is the first client, set the lower limit for revision cleanup
142 if (m_connections.size() == 1) { 137 if (m_connections.size() == 1) {
143 loadResource()->setLowerBoundRevision(0); 138 loadResource().setLowerBoundRevision(0);
144 } 139 }
145 140
146 if (socket->bytesAvailable()) { 141 if (socket->bytesAvailable()) {
@@ -177,7 +172,7 @@ void Listener::checkConnections()
177{ 172{
178 // If this was the last client, disengage the lower limit for revision cleanup 173 // If this was the last client, disengage the lower limit for revision cleanup
179 if (m_connections.isEmpty()) { 174 if (m_connections.isEmpty()) {
180 loadResource()->setLowerBoundRevision(std::numeric_limits<qint64>::max()); 175 loadResource().setLowerBoundRevision(std::numeric_limits<qint64>::max());
181 } 176 }
182 m_checkConnectionsTimer->start(); 177 m_checkConnectionsTimer->start();
183} 178}
@@ -249,10 +244,10 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
249 timer->start(); 244 timer->start();
250 auto job = KAsync::null<void>(); 245 auto job = KAsync::null<void>();
251 if (buffer->sourceSync()) { 246 if (buffer->sourceSync()) {
252 job = loadResource()->synchronizeWithSource(); 247 job = loadResource().synchronizeWithSource();
253 } 248 }
254 if (buffer->localSync()) { 249 if (buffer->localSync()) {
255 job = job.then<void>(loadResource()->processAllMessages()); 250 job = job.then<void>(loadResource().processAllMessages());
256 } 251 }
257 job.then<void>([callback, timer]() { 252 job.then<void>([callback, timer]() {
258 Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); 253 Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
@@ -274,7 +269,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
274 case Sink::Commands::ModifyEntityCommand: 269 case Sink::Commands::ModifyEntityCommand:
275 case Sink::Commands::CreateEntityCommand: 270 case Sink::Commands::CreateEntityCommand:
276 Trace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; 271 Trace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name;
277 loadResource()->processCommand(commandId, commandBuffer); 272 loadResource().processCommand(commandId, commandBuffer);
278 break; 273 break;
279 case Sink::Commands::ShutdownCommand: 274 case Sink::Commands::ShutdownCommand:
280 Log() << QString("Received shutdown command from %1").arg(client.name); 275 Log() << QString("Received shutdown command from %1").arg(client.name);
@@ -294,20 +289,19 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
294 } else { 289 } else {
295 Warning() << "received invalid command"; 290 Warning() << "received invalid command";
296 } 291 }
297 loadResource()->setLowerBoundRevision(lowerBoundRevision()); 292 loadResource().setLowerBoundRevision(lowerBoundRevision());
298 } break; 293 } break;
299 case Sink::Commands::RemoveFromDiskCommand: { 294 case Sink::Commands::RemoveFromDiskCommand: {
300 Log() << QString("Received a remove from disk command from %1").arg(client.name); 295 Log() << QString("Received a remove from disk command from %1").arg(client.name);
301 delete m_resource; 296 m_resource.reset(nullptr);
302 m_resource = nullptr; 297 loadResource().removeDataFromDisk();
303 loadResource()->removeDataFromDisk();
304 m_server->close(); 298 m_server->close();
305 QTimer::singleShot(0, this, &Listener::quit); 299 QTimer::singleShot(0, this, &Listener::quit);
306 } break; 300 } break;
307 default: 301 default:
308 if (commandId > Sink::Commands::CustomCommand) { 302 if (commandId > Sink::Commands::CustomCommand) {
309 Log() << QString("Received custom command from %1: ").arg(client.name) << commandId; 303 Log() << QString("Received custom command from %1: ").arg(client.name) << commandId;
310 loadResource()->processCommand(commandId, commandBuffer); 304 loadResource().processCommand(commandId, commandBuffer);
311 } else { 305 } else {
312 success = false; 306 success = false;
313 ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; 307 ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
@@ -437,25 +431,25 @@ void Listener::notify(const Sink::Notification &notification)
437 m_fbb.Clear(); 431 m_fbb.Clear();
438} 432}
439 433
440Sink::Resource *Listener::loadResource() 434Sink::Resource &Listener::loadResource()
441{ 435{
442 if (!m_resource) { 436 if (!m_resource) {
443 if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { 437 if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) {
444 m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); 438 m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(m_resourceInstanceIdentifier));
445 if (!m_resource) { 439 if (!m_resource) {
446 ErrorMsg() << "Failed to instantiate the resource " << m_resourceName; 440 ErrorMsg() << "Failed to instantiate the resource " << m_resourceName;
447 m_resource = new Sink::Resource; 441 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
448 } 442 }
449 Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); 443 Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory);
450 Trace() << QString("\tResource: %1").arg((qlonglong)m_resource); 444 Trace() << QString("\tResource: %1").arg((qlonglong)m_resource.get());
451 connect(m_resource, &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); 445 connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision);
452 connect(m_resource, &Sink::Resource::notify, this, &Listener::notify); 446 connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify);
453 } else { 447 } else {
454 ErrorMsg() << "Failed to load resource " << m_resourceName; 448 ErrorMsg() << "Failed to load resource " << m_resourceName;
455 m_resource = new Sink::Resource; 449 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
456 } 450 }
457 } 451 }
458 return m_resource; 452 return *m_resource;
459} 453}
460 454
461#pragma clang diagnostic push 455#pragma clang diagnostic push
diff --git a/common/listener.h b/common/listener.h
index 5e376c7..67d76e9 100644
--- a/common/listener.h
+++ b/common/listener.h
@@ -81,17 +81,17 @@ private:
81 bool processClientBuffer(Client &client); 81 bool processClientBuffer(Client &client);
82 void sendCommandCompleted(QLocalSocket *socket, uint messageId, bool success); 82 void sendCommandCompleted(QLocalSocket *socket, uint messageId, bool success);
83 void updateClientsWithRevision(qint64); 83 void updateClientsWithRevision(qint64);
84 Sink::Resource *loadResource(); 84 Sink::Resource &loadResource();
85 void readFromSocket(QLocalSocket *socket); 85 void readFromSocket(QLocalSocket *socket);
86 qint64 lowerBoundRevision(); 86 qint64 lowerBoundRevision();
87 87
88 QLocalServer *m_server; 88 std::unique_ptr<QLocalServer> m_server;
89 QVector<Client> m_connections; 89 QVector<Client> m_connections;
90 flatbuffers::FlatBufferBuilder m_fbb; 90 flatbuffers::FlatBufferBuilder m_fbb;
91 const QByteArray m_resourceName; 91 const QByteArray m_resourceName;
92 const QByteArray m_resourceInstanceIdentifier; 92 const QByteArray m_resourceInstanceIdentifier;
93 Sink::Resource *m_resource; 93 std::unique_ptr<Sink::Resource> m_resource;
94 QTimer *m_clientBufferProcessesTimer; 94 std::unique_ptr<QTimer> m_clientBufferProcessesTimer;
95 QTimer *m_checkConnectionsTimer; 95 std::unique_ptr<QTimer> m_checkConnectionsTimer;
96 int m_messageId; 96 int m_messageId;
97}; 97};
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index feceb77..034f913 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -52,7 +52,7 @@ public:
52 52
53 Storage storage; 53 Storage storage;
54 Storage::Transaction transaction; 54 Storage::Transaction transaction;
55 QHash<QString, QVector<Preprocessor *>> processors; 55 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors;
56 bool revisionChanged; 56 bool revisionChanged;
57 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 57 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
58 QTime transactionTime; 58 QTime transactionTime;
@@ -80,18 +80,16 @@ Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(paren
80Pipeline::~Pipeline() 80Pipeline::~Pipeline()
81{ 81{
82 d->transaction = Storage::Transaction(); 82 d->transaction = Storage::Transaction();
83 for (const auto &t : d->processors.keys()) {
84 qDeleteAll(d->processors.value(t));
85 }
86 delete d;
87} 83}
88 84
89void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) 85void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
90{ 86{
87 auto &list = d->processors[entityType];
88 list.clear();
91 for (auto p : processors) { 89 for (auto p : processors) {
92 p->setup(d->resourceType, d->resourceInstanceIdentifier, this); 90 p->setup(d->resourceType, d->resourceInstanceIdentifier, this);
91 list.append(QSharedPointer<Preprocessor>(p));
93 } 92 }
94 d->processors[entityType] = processors;
95} 93}
96 94
97void Pipeline::setResourceType(const QByteArray &resourceType) 95void Pipeline::setResourceType(const QByteArray &resourceType)
@@ -216,7 +214,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
216 214
217 auto adaptor = adaptorFactory->createAdaptor(*entity); 215 auto adaptor = adaptorFactory->createAdaptor(*entity);
218 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); 216 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
219 for (auto processor : d->processors[bufferType]) { 217 foreach (const auto &processor, d->processors[bufferType]) {
220 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); 218 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction);
221 } 219 }
222 //The maxRevision may have changed meanwhile if the entity created sub-entities 220 //The maxRevision may have changed meanwhile if the entity created sub-entities
@@ -325,7 +323,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
325 } 323 }
326 324
327 newAdaptor->resetChangedProperties(); 325 newAdaptor->resetChangedProperties();
328 for (auto processor : d->processors[bufferType]) { 326 foreach (const auto &processor, d->processors[bufferType]) {
329 processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); 327 processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction);
330 } 328 }
331 //The maxRevision may have changed meanwhile if the entity created sub-entities 329 //The maxRevision may have changed meanwhile if the entity created sub-entities
@@ -432,7 +430,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
432 430
433 d->storeNewRevision(newRevision, fbb, bufferType, key); 431 d->storeNewRevision(newRevision, fbb, bufferType, key);
434 432
435 for (auto processor : d->processors[bufferType]) { 433 foreach (const auto &processor, d->processors[bufferType]) {
436 processor->deletedEntity(key, newRevision, *current, d->transaction); 434 processor->deletedEntity(key, newRevision, *current, d->transaction);
437 } 435 }
438 436
@@ -485,7 +483,6 @@ Preprocessor::Preprocessor() : d(new Preprocessor::Private)
485 483
486Preprocessor::~Preprocessor() 484Preprocessor::~Preprocessor()
487{ 485{
488 delete d;
489} 486}
490 487
491void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline) 488void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline)
diff --git a/common/pipeline.h b/common/pipeline.h
index d04d795..ef89cf0 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -72,7 +72,7 @@ signals:
72 72
73private: 73private:
74 class Private; 74 class Private;
75 Private *const d; 75 const std::unique_ptr<Private> d;
76}; 76};
77 77
78class SINK_EXPORT Preprocessor 78class SINK_EXPORT Preprocessor
@@ -103,7 +103,7 @@ protected:
103private: 103private:
104 friend class Pipeline; 104 friend class Pipeline;
105 class Private; 105 class Private;
106 Private *const d; 106 const std::unique_ptr<Private> d;
107}; 107};
108 108
109template<typename DomainType> 109template<typename DomainType>