diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-12-03 11:17:08 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-12-03 11:23:25 +0100 |
commit | 0f75ad4b96ec5994c022109278cad28a43255793 (patch) | |
tree | 6d7c6153025a79557dd1218d9be0a4a7cb7945c5 | |
parent | 2c80424031c195333cfa6785ea7ab57dc9613fa3 (diff) | |
download | sink-0f75ad4b96ec5994c022109278cad28a43255793.tar.gz sink-0f75ad4b96ec5994c022109278cad28a43255793.zip |
Improved resource access caching
* Smarter caching. ResourceAccess instances close after a timeout, if not reused.
* Introduced a start command to avoid race condition when sending
commands to a resource that is currently shutting down.
* We resend pending commands after we lost access to the resource
* unexpectedly.
-rw-r--r-- | common/clientapi.cpp | 14 | ||||
-rw-r--r-- | common/clientapi.h | 9 | ||||
-rw-r--r-- | common/facade.cpp | 32 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 27 | ||||
-rw-r--r-- | common/resourceaccess.h | 1 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 1 | ||||
-rw-r--r-- | tests/resourcecommunicationtest.cpp | 29 |
7 files changed, 106 insertions, 7 deletions
diff --git a/common/clientapi.cpp b/common/clientapi.cpp index b732205..3dc9370 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp | |||
@@ -149,23 +149,35 @@ KAsync::Job<void> Store::remove(const DomainType &domainObject) | |||
149 | 149 | ||
150 | KAsync::Job<void> Store::shutdown(const QByteArray &identifier) | 150 | KAsync::Job<void> Store::shutdown(const QByteArray &identifier) |
151 | { | 151 | { |
152 | Trace() << "shutdown"; | 152 | Trace() << "shutdown " << identifier; |
153 | return ResourceAccess::connectToServer(identifier).then<void, QSharedPointer<QLocalSocket>>([identifier](QSharedPointer<QLocalSocket> socket, KAsync::Future<void> &future) { | 153 | return ResourceAccess::connectToServer(identifier).then<void, QSharedPointer<QLocalSocket>>([identifier](QSharedPointer<QLocalSocket> socket, KAsync::Future<void> &future) { |
154 | //We can't currently reuse the socket | 154 | //We can't currently reuse the socket |
155 | socket->close(); | 155 | socket->close(); |
156 | auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(identifier); | 156 | auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(identifier); |
157 | resourceAccess->open(); | 157 | resourceAccess->open(); |
158 | resourceAccess->sendCommand(Akonadi2::Commands::ShutdownCommand).then<void>([&future, resourceAccess]() { | 158 | resourceAccess->sendCommand(Akonadi2::Commands::ShutdownCommand).then<void>([&future, resourceAccess]() { |
159 | Trace() << "Shutdown complete"; | ||
159 | future.setFinished(); | 160 | future.setFinished(); |
160 | }).exec(); | 161 | }).exec(); |
161 | }, | 162 | }, |
162 | [](int, const QString &) { | 163 | [](int, const QString &) { |
164 | Trace() << "Resource is already closed."; | ||
163 | //Resource isn't started, nothing to shutdown | 165 | //Resource isn't started, nothing to shutdown |
164 | }) | 166 | }) |
165 | //FIXME JOBAPI this is only required because we don't care about the return value of connectToServer | 167 | //FIXME JOBAPI this is only required because we don't care about the return value of connectToServer |
166 | .template then<void>([](){}); | 168 | .template then<void>([](){}); |
167 | } | 169 | } |
168 | 170 | ||
171 | KAsync::Job<void> Store::start(const QByteArray &identifier) | ||
172 | { | ||
173 | Trace() << "start " << identifier; | ||
174 | auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(identifier); | ||
175 | resourceAccess->open(); | ||
176 | return resourceAccess->sendCommand(Akonadi2::Commands::PingCommand).then<void>([resourceAccess]() { | ||
177 | Trace() << "Start complete"; | ||
178 | }); | ||
179 | } | ||
180 | |||
169 | KAsync::Job<void> Store::synchronize(const Akonadi2::Query &query) | 181 | KAsync::Job<void> Store::synchronize(const Akonadi2::Query &query) |
170 | { | 182 | { |
171 | Trace() << "synchronize"; | 183 | Trace() << "synchronize"; |
diff --git a/common/clientapi.h b/common/clientapi.h index 8f87562..edf42e4 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -80,6 +80,15 @@ public: | |||
80 | static KAsync::Job<void> shutdown(const QByteArray &resourceIdentifier); | 80 | static KAsync::Job<void> shutdown(const QByteArray &resourceIdentifier); |
81 | 81 | ||
82 | /** | 82 | /** |
83 | * Start resource. | ||
84 | * | ||
85 | * The resource is ready for operation once this command completes. | ||
86 | * This command is only necessary if a resource was shutdown previously, | ||
87 | * otherwise the resource process will automatically start as necessary. | ||
88 | */ | ||
89 | static KAsync::Job<void> start(const QByteArray &resourceIdentifier); | ||
90 | |||
91 | /** | ||
83 | * Synchronize data to local cache. | 92 | * Synchronize data to local cache. |
84 | */ | 93 | */ |
85 | static KAsync::Job<void> synchronize(const Akonadi2::Query &query); | 94 | static KAsync::Job<void> synchronize(const Akonadi2::Query &query); |
diff --git a/common/facade.cpp b/common/facade.cpp index ab41f96..22ef84a 100644 --- a/common/facade.cpp +++ b/common/facade.cpp | |||
@@ -42,12 +42,42 @@ public: | |||
42 | Akonadi2::ResourceAccess::Ptr getAccess(const QByteArray &instanceIdentifier) | 42 | Akonadi2::ResourceAccess::Ptr getAccess(const QByteArray &instanceIdentifier) |
43 | { | 43 | { |
44 | if (!mCache.contains(instanceIdentifier)) { | 44 | if (!mCache.contains(instanceIdentifier)) { |
45 | mCache.insert(instanceIdentifier, Akonadi2::ResourceAccess::Ptr::create(instanceIdentifier)); | 45 | //Reuse the pointer if something else kept the resourceaccess alive |
46 | if (mWeakCache.contains(instanceIdentifier)) { | ||
47 | auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef(); | ||
48 | if (sharedPointer) { | ||
49 | mCache.insert(instanceIdentifier, sharedPointer); | ||
50 | } | ||
51 | } | ||
52 | if (!mCache.contains(instanceIdentifier)) { | ||
53 | //Create a new instance if necessary | ||
54 | auto sharedPointer = Akonadi2::ResourceAccess::Ptr::create(instanceIdentifier); | ||
55 | QObject::connect(sharedPointer.data(), &Akonadi2::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) { | ||
56 | if (!ready) { | ||
57 | mCache.remove(instanceIdentifier); | ||
58 | } | ||
59 | }); | ||
60 | mCache.insert(instanceIdentifier, sharedPointer); | ||
61 | mWeakCache.insert(instanceIdentifier, sharedPointer); | ||
62 | } | ||
46 | } | 63 | } |
64 | if (!mTimer.contains(instanceIdentifier)) { | ||
65 | auto timer = new QTimer; | ||
66 | //Drop connection after 3 seconds (which is a random value) | ||
67 | QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { | ||
68 | mCache.remove(instanceIdentifier); | ||
69 | }); | ||
70 | timer->setInterval(3000); | ||
71 | mTimer.insert(instanceIdentifier, timer); | ||
72 | } | ||
73 | auto timer = mTimer.value(instanceIdentifier); | ||
74 | timer->start(); | ||
47 | return mCache.value(instanceIdentifier); | 75 | return mCache.value(instanceIdentifier); |
48 | } | 76 | } |
49 | 77 | ||
78 | QHash<QByteArray, QWeakPointer<Akonadi2::ResourceAccess> > mWeakCache; | ||
50 | QHash<QByteArray, Akonadi2::ResourceAccess::Ptr> mCache; | 79 | QHash<QByteArray, Akonadi2::ResourceAccess::Ptr> mCache; |
80 | QHash<QByteArray, QTimer*> mTimer; | ||
51 | }; | 81 | }; |
52 | 82 | ||
53 | template<class DomainType> | 83 | template<class DomainType> |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 8988032..7be1259 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -368,6 +368,8 @@ void ResourceAccess::open() | |||
368 | void ResourceAccess::close() | 368 | void ResourceAccess::close() |
369 | { | 369 | { |
370 | log(QString("Closing %1").arg(d->socket->fullServerName())); | 370 | log(QString("Closing %1").arg(d->socket->fullServerName())); |
371 | Trace() << "Pending commands: " << d->pendingCommands.size(); | ||
372 | Trace() << "Queued commands: " << d->commandQueue.size(); | ||
371 | d->socket->close(); | 373 | d->socket->close(); |
372 | } | 374 | } |
373 | 375 | ||
@@ -393,12 +395,24 @@ void ResourceAccess::processCommandQueue() | |||
393 | { | 395 | { |
394 | //TODO: serialize instead of blast them all through the socket? | 396 | //TODO: serialize instead of blast them all through the socket? |
395 | Trace() << "We have " << d->commandQueue.size() << " queued commands"; | 397 | Trace() << "We have " << d->commandQueue.size() << " queued commands"; |
398 | Trace() << "Pending commands: " << d->pendingCommands.size(); | ||
396 | for (auto command: d->commandQueue) { | 399 | for (auto command: d->commandQueue) { |
397 | sendCommand(command); | 400 | sendCommand(command); |
398 | } | 401 | } |
399 | d->commandQueue.clear(); | 402 | d->commandQueue.clear(); |
400 | } | 403 | } |
401 | 404 | ||
405 | void ResourceAccess::processPendingCommandQueue() | ||
406 | { | ||
407 | Trace() << "We have " << d->pendingCommands.size() << " pending commands"; | ||
408 | for (auto command: d->pendingCommands) { | ||
409 | Trace() << "Reenquing command " << command->commandId; | ||
410 | d->commandQueue << command; | ||
411 | } | ||
412 | d->pendingCommands.clear(); | ||
413 | processCommandQueue(); | ||
414 | } | ||
415 | |||
402 | void ResourceAccess::connected() | 416 | void ResourceAccess::connected() |
403 | { | 417 | { |
404 | if (!isReady()) { | 418 | if (!isReady()) { |
@@ -415,6 +429,9 @@ void ResourceAccess::connected() | |||
415 | Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); | 429 | Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); |
416 | } | 430 | } |
417 | 431 | ||
432 | //Reenqueue pending commands, we failed to send them | ||
433 | processPendingCommandQueue(); | ||
434 | //Send queued commands | ||
418 | processCommandQueue(); | 435 | processCommandQueue(); |
419 | 436 | ||
420 | emit ready(true); | 437 | emit ready(true); |
@@ -424,8 +441,6 @@ void ResourceAccess::disconnected() | |||
424 | { | 441 | { |
425 | log(QString("Disconnected from %1").arg(d->socket->fullServerName())); | 442 | log(QString("Disconnected from %1").arg(d->socket->fullServerName())); |
426 | d->socket->close(); | 443 | d->socket->close(); |
427 | //TODO fail all existing jobs? or retry | ||
428 | d->abortPendingOperations(); | ||
429 | emit ready(false); | 444 | emit ready(false); |
430 | } | 445 | } |
431 | 446 | ||
@@ -433,12 +448,14 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) | |||
433 | { | 448 | { |
434 | if (error == QLocalSocket::PeerClosedError) { | 449 | if (error == QLocalSocket::PeerClosedError) { |
435 | Log(d->resourceInstanceIdentifier) << "The resource closed the connection."; | 450 | Log(d->resourceInstanceIdentifier) << "The resource closed the connection."; |
451 | d->abortPendingOperations(); | ||
436 | } else { | 452 | } else { |
437 | Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); | 453 | Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); |
454 | if (d->pendingCommands.size()) { | ||
455 | Trace() << "Reconnecting due to pending operations: " << d->pendingCommands.size(); | ||
456 | open(); | ||
457 | } | ||
438 | } | 458 | } |
439 | |||
440 | //TODO We could first try to reconnect and resend the message if necessary. | ||
441 | d->abortPendingOperations(); | ||
442 | } | 459 | } |
443 | 460 | ||
444 | void ResourceAccess::readResourceMessage() | 461 | void ResourceAccess::readResourceMessage() |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 527cfa3..7f61b30 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -102,6 +102,7 @@ private: | |||
102 | 102 | ||
103 | void sendCommand(const QSharedPointer<QueuedCommand> &command); | 103 | void sendCommand(const QSharedPointer<QueuedCommand> &command); |
104 | void processCommandQueue(); | 104 | void processCommandQueue(); |
105 | void processPendingCommandQueue(); | ||
105 | 106 | ||
106 | class Private; | 107 | class Private; |
107 | Private * const d; | 108 | Private * const d; |
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 20c725f..b8711a2 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -35,6 +35,7 @@ private Q_SLOTS: | |||
35 | DummyResource::removeFromDisk("org.kde.dummy.instance1"); | 35 | DummyResource::removeFromDisk("org.kde.dummy.instance1"); |
36 | auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy"); | 36 | auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy"); |
37 | QVERIFY(factory); | 37 | QVERIFY(factory); |
38 | Akonadi2::Store::start(QByteArray("org.kde.dummy.instance1")).exec().waitForFinished(); | ||
38 | } | 39 | } |
39 | 40 | ||
40 | void init() | 41 | void init() |
diff --git a/tests/resourcecommunicationtest.cpp b/tests/resourcecommunicationtest.cpp index fa2b5a1..18e9223 100644 --- a/tests/resourcecommunicationtest.cpp +++ b/tests/resourcecommunicationtest.cpp | |||
@@ -63,6 +63,35 @@ private Q_SLOTS: | |||
63 | QTRY_COMPARE(complete, count); | 63 | QTRY_COMPARE(complete, count); |
64 | QVERIFY(!errors); | 64 | QVERIFY(!errors); |
65 | } | 65 | } |
66 | |||
67 | void testResourceAccessReuse() | ||
68 | { | ||
69 | qDebug(); | ||
70 | const QByteArray resourceIdentifier("test"); | ||
71 | Listener listener(resourceIdentifier); | ||
72 | Akonadi2::ResourceAccess resourceAccess(resourceIdentifier); | ||
73 | resourceAccess.open(); | ||
74 | |||
75 | const int count = 10; | ||
76 | int complete = 0; | ||
77 | int errors = 0; | ||
78 | for (int i = 0; i < count; i++) { | ||
79 | resourceAccess.sendCommand(Akonadi2::Commands::PingCommand) | ||
80 | .then<void>([&complete]() { | ||
81 | complete++; | ||
82 | }, | ||
83 | [&errors, &complete](int error, const QString &msg) { | ||
84 | qWarning() << msg; | ||
85 | errors++; | ||
86 | complete++; | ||
87 | }).then<void>([&resourceAccess]() { | ||
88 | resourceAccess.close(); | ||
89 | resourceAccess.open(); | ||
90 | }).exec().waitForFinished(); | ||
91 | } | ||
92 | QTRY_COMPARE(complete, count); | ||
93 | QVERIFY(!errors); | ||
94 | } | ||
66 | }; | 95 | }; |
67 | 96 | ||
68 | QTEST_MAIN(ResourceCommunicationTest) | 97 | QTEST_MAIN(ResourceCommunicationTest) |