summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-12-03 11:17:08 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-12-03 11:23:25 +0100
commit0f75ad4b96ec5994c022109278cad28a43255793 (patch)
tree6d7c6153025a79557dd1218d9be0a4a7cb7945c5
parent2c80424031c195333cfa6785ea7ab57dc9613fa3 (diff)
downloadsink-0f75ad4b96ec5994c022109278cad28a43255793.tar.gz
sink-0f75ad4b96ec5994c022109278cad28a43255793.zip
Improved resource access caching
* Smarter caching. ResourceAccess instances close after a timeout, if not reused. * Introduced a start command to avoid race condition when sending commands to a resource that is currently shutting down. * We resend pending commands after we lost access to the resource * unexpectedly.
-rw-r--r--common/clientapi.cpp14
-rw-r--r--common/clientapi.h9
-rw-r--r--common/facade.cpp32
-rw-r--r--common/resourceaccess.cpp27
-rw-r--r--common/resourceaccess.h1
-rw-r--r--tests/dummyresourcetest.cpp1
-rw-r--r--tests/resourcecommunicationtest.cpp29
7 files changed, 106 insertions, 7 deletions
diff --git a/common/clientapi.cpp b/common/clientapi.cpp
index b732205..3dc9370 100644
--- a/common/clientapi.cpp
+++ b/common/clientapi.cpp
@@ -149,23 +149,35 @@ KAsync::Job<void> Store::remove(const DomainType &domainObject)
149 149
150KAsync::Job<void> Store::shutdown(const QByteArray &identifier) 150KAsync::Job<void> Store::shutdown(const QByteArray &identifier)
151{ 151{
152 Trace() << "shutdown"; 152 Trace() << "shutdown " << identifier;
153 return ResourceAccess::connectToServer(identifier).then<void, QSharedPointer<QLocalSocket>>([identifier](QSharedPointer<QLocalSocket> socket, KAsync::Future<void> &future) { 153 return ResourceAccess::connectToServer(identifier).then<void, QSharedPointer<QLocalSocket>>([identifier](QSharedPointer<QLocalSocket> socket, KAsync::Future<void> &future) {
154 //We can't currently reuse the socket 154 //We can't currently reuse the socket
155 socket->close(); 155 socket->close();
156 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(identifier); 156 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(identifier);
157 resourceAccess->open(); 157 resourceAccess->open();
158 resourceAccess->sendCommand(Akonadi2::Commands::ShutdownCommand).then<void>([&future, resourceAccess]() { 158 resourceAccess->sendCommand(Akonadi2::Commands::ShutdownCommand).then<void>([&future, resourceAccess]() {
159 Trace() << "Shutdown complete";
159 future.setFinished(); 160 future.setFinished();
160 }).exec(); 161 }).exec();
161 }, 162 },
162 [](int, const QString &) { 163 [](int, const QString &) {
164 Trace() << "Resource is already closed.";
163 //Resource isn't started, nothing to shutdown 165 //Resource isn't started, nothing to shutdown
164 }) 166 })
165 //FIXME JOBAPI this is only required because we don't care about the return value of connectToServer 167 //FIXME JOBAPI this is only required because we don't care about the return value of connectToServer
166 .template then<void>([](){}); 168 .template then<void>([](){});
167} 169}
168 170
171KAsync::Job<void> Store::start(const QByteArray &identifier)
172{
173 Trace() << "start " << identifier;
174 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(identifier);
175 resourceAccess->open();
176 return resourceAccess->sendCommand(Akonadi2::Commands::PingCommand).then<void>([resourceAccess]() {
177 Trace() << "Start complete";
178 });
179}
180
169KAsync::Job<void> Store::synchronize(const Akonadi2::Query &query) 181KAsync::Job<void> Store::synchronize(const Akonadi2::Query &query)
170{ 182{
171 Trace() << "synchronize"; 183 Trace() << "synchronize";
diff --git a/common/clientapi.h b/common/clientapi.h
index 8f87562..edf42e4 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -80,6 +80,15 @@ public:
80 static KAsync::Job<void> shutdown(const QByteArray &resourceIdentifier); 80 static KAsync::Job<void> shutdown(const QByteArray &resourceIdentifier);
81 81
82 /** 82 /**
83 * Start resource.
84 *
85 * The resource is ready for operation once this command completes.
86 * This command is only necessary if a resource was shutdown previously,
87 * otherwise the resource process will automatically start as necessary.
88 */
89 static KAsync::Job<void> start(const QByteArray &resourceIdentifier);
90
91 /**
83 * Synchronize data to local cache. 92 * Synchronize data to local cache.
84 */ 93 */
85 static KAsync::Job<void> synchronize(const Akonadi2::Query &query); 94 static KAsync::Job<void> synchronize(const Akonadi2::Query &query);
diff --git a/common/facade.cpp b/common/facade.cpp
index ab41f96..22ef84a 100644
--- a/common/facade.cpp
+++ b/common/facade.cpp
@@ -42,12 +42,42 @@ public:
42 Akonadi2::ResourceAccess::Ptr getAccess(const QByteArray &instanceIdentifier) 42 Akonadi2::ResourceAccess::Ptr getAccess(const QByteArray &instanceIdentifier)
43 { 43 {
44 if (!mCache.contains(instanceIdentifier)) { 44 if (!mCache.contains(instanceIdentifier)) {
45 mCache.insert(instanceIdentifier, Akonadi2::ResourceAccess::Ptr::create(instanceIdentifier)); 45 //Reuse the pointer if something else kept the resourceaccess alive
46 if (mWeakCache.contains(instanceIdentifier)) {
47 auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef();
48 if (sharedPointer) {
49 mCache.insert(instanceIdentifier, sharedPointer);
50 }
51 }
52 if (!mCache.contains(instanceIdentifier)) {
53 //Create a new instance if necessary
54 auto sharedPointer = Akonadi2::ResourceAccess::Ptr::create(instanceIdentifier);
55 QObject::connect(sharedPointer.data(), &Akonadi2::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) {
56 if (!ready) {
57 mCache.remove(instanceIdentifier);
58 }
59 });
60 mCache.insert(instanceIdentifier, sharedPointer);
61 mWeakCache.insert(instanceIdentifier, sharedPointer);
62 }
46 } 63 }
64 if (!mTimer.contains(instanceIdentifier)) {
65 auto timer = new QTimer;
66 //Drop connection after 3 seconds (which is a random value)
67 QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() {
68 mCache.remove(instanceIdentifier);
69 });
70 timer->setInterval(3000);
71 mTimer.insert(instanceIdentifier, timer);
72 }
73 auto timer = mTimer.value(instanceIdentifier);
74 timer->start();
47 return mCache.value(instanceIdentifier); 75 return mCache.value(instanceIdentifier);
48 } 76 }
49 77
78 QHash<QByteArray, QWeakPointer<Akonadi2::ResourceAccess> > mWeakCache;
50 QHash<QByteArray, Akonadi2::ResourceAccess::Ptr> mCache; 79 QHash<QByteArray, Akonadi2::ResourceAccess::Ptr> mCache;
80 QHash<QByteArray, QTimer*> mTimer;
51}; 81};
52 82
53template<class DomainType> 83template<class DomainType>
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 8988032..7be1259 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -368,6 +368,8 @@ void ResourceAccess::open()
368void ResourceAccess::close() 368void ResourceAccess::close()
369{ 369{
370 log(QString("Closing %1").arg(d->socket->fullServerName())); 370 log(QString("Closing %1").arg(d->socket->fullServerName()));
371 Trace() << "Pending commands: " << d->pendingCommands.size();
372 Trace() << "Queued commands: " << d->commandQueue.size();
371 d->socket->close(); 373 d->socket->close();
372} 374}
373 375
@@ -393,12 +395,24 @@ void ResourceAccess::processCommandQueue()
393{ 395{
394 //TODO: serialize instead of blast them all through the socket? 396 //TODO: serialize instead of blast them all through the socket?
395 Trace() << "We have " << d->commandQueue.size() << " queued commands"; 397 Trace() << "We have " << d->commandQueue.size() << " queued commands";
398 Trace() << "Pending commands: " << d->pendingCommands.size();
396 for (auto command: d->commandQueue) { 399 for (auto command: d->commandQueue) {
397 sendCommand(command); 400 sendCommand(command);
398 } 401 }
399 d->commandQueue.clear(); 402 d->commandQueue.clear();
400} 403}
401 404
405void ResourceAccess::processPendingCommandQueue()
406{
407 Trace() << "We have " << d->pendingCommands.size() << " pending commands";
408 for (auto command: d->pendingCommands) {
409 Trace() << "Reenquing command " << command->commandId;
410 d->commandQueue << command;
411 }
412 d->pendingCommands.clear();
413 processCommandQueue();
414}
415
402void ResourceAccess::connected() 416void ResourceAccess::connected()
403{ 417{
404 if (!isReady()) { 418 if (!isReady()) {
@@ -415,6 +429,9 @@ void ResourceAccess::connected()
415 Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); 429 Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb);
416 } 430 }
417 431
432 //Reenqueue pending commands, we failed to send them
433 processPendingCommandQueue();
434 //Send queued commands
418 processCommandQueue(); 435 processCommandQueue();
419 436
420 emit ready(true); 437 emit ready(true);
@@ -424,8 +441,6 @@ void ResourceAccess::disconnected()
424{ 441{
425 log(QString("Disconnected from %1").arg(d->socket->fullServerName())); 442 log(QString("Disconnected from %1").arg(d->socket->fullServerName()));
426 d->socket->close(); 443 d->socket->close();
427 //TODO fail all existing jobs? or retry
428 d->abortPendingOperations();
429 emit ready(false); 444 emit ready(false);
430} 445}
431 446
@@ -433,12 +448,14 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
433{ 448{
434 if (error == QLocalSocket::PeerClosedError) { 449 if (error == QLocalSocket::PeerClosedError) {
435 Log(d->resourceInstanceIdentifier) << "The resource closed the connection."; 450 Log(d->resourceInstanceIdentifier) << "The resource closed the connection.";
451 d->abortPendingOperations();
436 } else { 452 } else {
437 Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); 453 Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString());
454 if (d->pendingCommands.size()) {
455 Trace() << "Reconnecting due to pending operations: " << d->pendingCommands.size();
456 open();
457 }
438 } 458 }
439
440 //TODO We could first try to reconnect and resend the message if necessary.
441 d->abortPendingOperations();
442} 459}
443 460
444void ResourceAccess::readResourceMessage() 461void ResourceAccess::readResourceMessage()
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 527cfa3..7f61b30 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -102,6 +102,7 @@ private:
102 102
103 void sendCommand(const QSharedPointer<QueuedCommand> &command); 103 void sendCommand(const QSharedPointer<QueuedCommand> &command);
104 void processCommandQueue(); 104 void processCommandQueue();
105 void processPendingCommandQueue();
105 106
106 class Private; 107 class Private;
107 Private * const d; 108 Private * const d;
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp
index 20c725f..b8711a2 100644
--- a/tests/dummyresourcetest.cpp
+++ b/tests/dummyresourcetest.cpp
@@ -35,6 +35,7 @@ private Q_SLOTS:
35 DummyResource::removeFromDisk("org.kde.dummy.instance1"); 35 DummyResource::removeFromDisk("org.kde.dummy.instance1");
36 auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy"); 36 auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy");
37 QVERIFY(factory); 37 QVERIFY(factory);
38 Akonadi2::Store::start(QByteArray("org.kde.dummy.instance1")).exec().waitForFinished();
38 } 39 }
39 40
40 void init() 41 void init()
diff --git a/tests/resourcecommunicationtest.cpp b/tests/resourcecommunicationtest.cpp
index fa2b5a1..18e9223 100644
--- a/tests/resourcecommunicationtest.cpp
+++ b/tests/resourcecommunicationtest.cpp
@@ -63,6 +63,35 @@ private Q_SLOTS:
63 QTRY_COMPARE(complete, count); 63 QTRY_COMPARE(complete, count);
64 QVERIFY(!errors); 64 QVERIFY(!errors);
65 } 65 }
66
67 void testResourceAccessReuse()
68 {
69 qDebug();
70 const QByteArray resourceIdentifier("test");
71 Listener listener(resourceIdentifier);
72 Akonadi2::ResourceAccess resourceAccess(resourceIdentifier);
73 resourceAccess.open();
74
75 const int count = 10;
76 int complete = 0;
77 int errors = 0;
78 for (int i = 0; i < count; i++) {
79 resourceAccess.sendCommand(Akonadi2::Commands::PingCommand)
80 .then<void>([&complete]() {
81 complete++;
82 },
83 [&errors, &complete](int error, const QString &msg) {
84 qWarning() << msg;
85 errors++;
86 complete++;
87 }).then<void>([&resourceAccess]() {
88 resourceAccess.close();
89 resourceAccess.open();
90 }).exec().waitForFinished();
91 }
92 QTRY_COMPARE(complete, count);
93 QVERIFY(!errors);
94 }
66}; 95};
67 96
68QTEST_MAIN(ResourceCommunicationTest) 97QTEST_MAIN(ResourceCommunicationTest)