diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/clientapi.cpp | 5 | ||||
-rw-r--r-- | common/clientapi.h | 17 | ||||
-rw-r--r-- | common/pipeline.cpp | 5 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 27 |
4 files changed, 27 insertions, 27 deletions
diff --git a/common/clientapi.cpp b/common/clientapi.cpp index 10115da..6f0b421 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp | |||
@@ -36,10 +36,7 @@ void Store::shutdown(const QString &identifier) | |||
36 | Akonadi2::ResourceAccess resourceAccess(identifier); | 36 | Akonadi2::ResourceAccess resourceAccess(identifier); |
37 | //FIXME this starts the resource, just to shut it down again if it's not running in the first place. | 37 | //FIXME this starts the resource, just to shut it down again if it's not running in the first place. |
38 | resourceAccess.open(); | 38 | resourceAccess.open(); |
39 | resourceAccess.sendCommand(Akonadi2::Commands::ShutdownCommand).then<void>([](Async::Future<void> &f){ | 39 | resourceAccess.sendCommand(Akonadi2::Commands::ShutdownCommand).exec().waitForFinished(); |
40 | //TODO wait for disconnect | ||
41 | f.setFinished(); | ||
42 | }).exec().waitForFinished(); | ||
43 | } | 40 | } |
44 | 41 | ||
45 | } // namespace Akonadi2 | 42 | } // namespace Akonadi2 |
diff --git a/common/clientapi.h b/common/clientapi.h index aa3aab8..63305ab 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -425,13 +425,22 @@ public: | |||
425 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); | 425 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); |
426 | //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. | 426 | //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. |
427 | std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); | 427 | std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); |
428 | //We copy the facade pointer to keep it alive | 428 | |
429 | job = job.then(facade->load(query, addCallback)); | 429 | // TODO The following is a necessary hack to keep the facade alive. |
430 | // Otherwise this would reduce to: | ||
431 | // job = job.then(facade->load(query, addCallback)); | ||
432 | // We somehow have to guarantee that the facade remains valid for the duration of the job | ||
433 | job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) { | ||
434 | Async::Job<void> j = facade->load(query, addCallback); | ||
435 | j.then<void>([&future, facade](Async::Future<void> &f) { | ||
436 | future.setFinished(); | ||
437 | f.setFinished(); | ||
438 | }).exec(); | ||
439 | }); | ||
430 | } | 440 | } |
431 | job.then<void>([/* eventloop, */resultSet](Async::Future<void> &future) { | 441 | job.then<void>([resultSet]() { |
432 | qDebug() << "Query complete"; | 442 | qDebug() << "Query complete"; |
433 | resultSet->complete(); | 443 | resultSet->complete(); |
434 | future.setFinished(); | ||
435 | }).exec().waitForFinished(); //We use the eventloop provided by waitForFinished to keep the thread alive until all is done | 444 | }).exec().waitForFinished(); //We use the eventloop provided by waitForFinished to keep the thread alive until all is done |
436 | }); | 445 | }); |
437 | return resultSet->emitter(); | 446 | return resultSet->emitter(); |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index f321cf5..ed40699 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -30,6 +30,7 @@ | |||
30 | #include "createentity_generated.h" | 30 | #include "createentity_generated.h" |
31 | #include "entitybuffer.h" | 31 | #include "entitybuffer.h" |
32 | #include "async/src/async.h" | 32 | #include "async/src/async.h" |
33 | #include "log.h" | ||
33 | 34 | ||
34 | namespace Akonadi2 | 35 | namespace Akonadi2 |
35 | { | 36 | { |
@@ -95,7 +96,7 @@ void Pipeline::null() | |||
95 | 96 | ||
96 | Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | 97 | Async::Job<void> Pipeline::newEntity(void const *command, size_t size) |
97 | { | 98 | { |
98 | qDebug() << "Pipeline: New Entity"; | 99 | Log() << "Pipeline: New Entity"; |
99 | 100 | ||
100 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | 101 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. |
101 | const auto key = QUuid::createUuid().toString().toUtf8(); | 102 | const auto key = QUuid::createUuid().toString().toUtf8(); |
@@ -136,7 +137,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
136 | 137 | ||
137 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | 138 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); |
138 | storage().setMaxRevision(newRevision); | 139 | storage().setMaxRevision(newRevision); |
139 | qDebug() << "Pipeline: wrote entity: "<< newRevision; | 140 | Log() << "Pipeline: wrote entity: "<< newRevision; |
140 | 141 | ||
141 | return Async::start<void>([this, key, entityType](Async::Future<void> &future) { | 142 | return Async::start<void>([this, key, entityType](Async::Future<void> &future) { |
142 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { | 143 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 22d7d97..7320e50 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -41,23 +41,14 @@ class QueuedCommand | |||
41 | public: | 41 | public: |
42 | QueuedCommand(int commandId, const std::function<void(int, const QString &)> &callback) | 42 | QueuedCommand(int commandId, const std::function<void(int, const QString &)> &callback) |
43 | : commandId(commandId), | 43 | : commandId(commandId), |
44 | bufferSize(0), | ||
45 | buffer(0), | ||
46 | callback(callback) | 44 | callback(callback) |
47 | {} | 45 | {} |
48 | 46 | ||
49 | QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void(int, const QString &)> &callback) | 47 | QueuedCommand(int commandId, const QByteArray &b, const std::function<void(int, const QString &)> &callback) |
50 | : commandId(commandId), | 48 | : commandId(commandId), |
51 | bufferSize(fbb.GetSize()), | 49 | buffer(b), |
52 | buffer(new char[bufferSize]), | ||
53 | callback(callback) | 50 | callback(callback) |
54 | { | 51 | { |
55 | memcpy(buffer, fbb.GetBufferPointer(), bufferSize); | ||
56 | } | ||
57 | |||
58 | ~QueuedCommand() | ||
59 | { | ||
60 | delete[] buffer; | ||
61 | } | 52 | } |
62 | 53 | ||
63 | private: | 54 | private: |
@@ -66,8 +57,7 @@ private: | |||
66 | 57 | ||
67 | public: | 58 | public: |
68 | const int commandId; | 59 | const int commandId; |
69 | const uint bufferSize; | 60 | QByteArray buffer; |
70 | char *buffer; | ||
71 | std::function<void(int, const QString &)> callback; | 61 | std::function<void(int, const QString &)> callback; |
72 | }; | 62 | }; |
73 | 63 | ||
@@ -159,7 +149,9 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId) | |||
159 | 149 | ||
160 | Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | 150 | Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) |
161 | { | 151 | { |
162 | return Async::start<void>([commandId, &fbb, this](Async::Future<void> &f) { | 152 | //The flatbuffer is transient, but we want to store it until the job is executed |
153 | QByteArray buffer(reinterpret_cast<const char*>(fbb.GetBufferPointer()), fbb.GetSize()); | ||
154 | return Async::start<void>([commandId, buffer, this](Async::Future<void> &f) { | ||
163 | auto callback = [&f](int error, const QString &errorMessage) { | 155 | auto callback = [&f](int error, const QString &errorMessage) { |
164 | if (error) { | 156 | if (error) { |
165 | f.setError(error, errorMessage); | 157 | f.setError(error, errorMessage); |
@@ -169,12 +161,13 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu | |||
169 | }; | 161 | }; |
170 | 162 | ||
171 | if (isReady()) { | 163 | if (isReady()) { |
164 | //TODO: We probably always want to queue the command, so we can resend it in case something goes wrong | ||
172 | d->messageId++; | 165 | d->messageId++; |
173 | log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); | 166 | log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); |
174 | registerCallback(d->messageId, callback); | 167 | registerCallback(d->messageId, callback); |
175 | Commands::write(d->socket, d->messageId, commandId, fbb); | 168 | Commands::write(d->socket, d->messageId, commandId, buffer.constData(), buffer.size()); |
176 | } else { | 169 | } else { |
177 | d->commandQueue << new QueuedCommand(commandId, fbb, callback); | 170 | d->commandQueue << new QueuedCommand(commandId, buffer, callback); |
178 | } | 171 | } |
179 | }); | 172 | }); |
180 | } | 173 | } |
@@ -237,7 +230,7 @@ void ResourceAccess::connected() | |||
237 | if (command->callback) { | 230 | if (command->callback) { |
238 | registerCallback(d->messageId, command->callback); | 231 | registerCallback(d->messageId, command->callback); |
239 | } | 232 | } |
240 | Commands::write(d->socket, d->messageId, command->commandId, command->buffer, command->bufferSize); | 233 | Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); |
241 | delete command; | 234 | delete command; |
242 | } | 235 | } |
243 | d->commandQueue.clear(); | 236 | d->commandQueue.clear(); |