diff options
-rw-r--r-- | common/clientapi.cpp | 5 | ||||
-rw-r--r-- | common/clientapi.h | 17 | ||||
-rw-r--r-- | common/pipeline.cpp | 5 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 27 | ||||
-rw-r--r-- | dummyresource/domainadaptor.cpp | 14 | ||||
-rw-r--r-- | dummyresource/resourcefactory.cpp | 22 | ||||
-rw-r--r-- | synchronizer/listener.cpp | 4 | ||||
-rw-r--r-- | tests/domainadaptortest.cpp | 14 |
8 files changed, 63 insertions, 45 deletions
diff --git a/common/clientapi.cpp b/common/clientapi.cpp index 10115da..6f0b421 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp | |||
@@ -36,10 +36,7 @@ void Store::shutdown(const QString &identifier) | |||
36 | Akonadi2::ResourceAccess resourceAccess(identifier); | 36 | Akonadi2::ResourceAccess resourceAccess(identifier); |
37 | //FIXME this starts the resource, just to shut it down again if it's not running in the first place. | 37 | //FIXME this starts the resource, just to shut it down again if it's not running in the first place. |
38 | resourceAccess.open(); | 38 | resourceAccess.open(); |
39 | resourceAccess.sendCommand(Akonadi2::Commands::ShutdownCommand).then<void>([](Async::Future<void> &f){ | 39 | resourceAccess.sendCommand(Akonadi2::Commands::ShutdownCommand).exec().waitForFinished(); |
40 | //TODO wait for disconnect | ||
41 | f.setFinished(); | ||
42 | }).exec().waitForFinished(); | ||
43 | } | 40 | } |
44 | 41 | ||
45 | } // namespace Akonadi2 | 42 | } // namespace Akonadi2 |
diff --git a/common/clientapi.h b/common/clientapi.h index aa3aab8..63305ab 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -425,13 +425,22 @@ public: | |||
425 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); | 425 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); |
426 | //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. | 426 | //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. |
427 | std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); | 427 | std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); |
428 | //We copy the facade pointer to keep it alive | 428 | |
429 | job = job.then(facade->load(query, addCallback)); | 429 | // TODO The following is a necessary hack to keep the facade alive. |
430 | // Otherwise this would reduce to: | ||
431 | // job = job.then(facade->load(query, addCallback)); | ||
432 | // We somehow have to guarantee that the facade remains valid for the duration of the job | ||
433 | job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) { | ||
434 | Async::Job<void> j = facade->load(query, addCallback); | ||
435 | j.then<void>([&future, facade](Async::Future<void> &f) { | ||
436 | future.setFinished(); | ||
437 | f.setFinished(); | ||
438 | }).exec(); | ||
439 | }); | ||
430 | } | 440 | } |
431 | job.then<void>([/* eventloop, */resultSet](Async::Future<void> &future) { | 441 | job.then<void>([resultSet]() { |
432 | qDebug() << "Query complete"; | 442 | qDebug() << "Query complete"; |
433 | resultSet->complete(); | 443 | resultSet->complete(); |
434 | future.setFinished(); | ||
435 | }).exec().waitForFinished(); //We use the eventloop provided by waitForFinished to keep the thread alive until all is done | 444 | }).exec().waitForFinished(); //We use the eventloop provided by waitForFinished to keep the thread alive until all is done |
436 | }); | 445 | }); |
437 | return resultSet->emitter(); | 446 | return resultSet->emitter(); |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index f321cf5..ed40699 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -30,6 +30,7 @@ | |||
30 | #include "createentity_generated.h" | 30 | #include "createentity_generated.h" |
31 | #include "entitybuffer.h" | 31 | #include "entitybuffer.h" |
32 | #include "async/src/async.h" | 32 | #include "async/src/async.h" |
33 | #include "log.h" | ||
33 | 34 | ||
34 | namespace Akonadi2 | 35 | namespace Akonadi2 |
35 | { | 36 | { |
@@ -95,7 +96,7 @@ void Pipeline::null() | |||
95 | 96 | ||
96 | Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | 97 | Async::Job<void> Pipeline::newEntity(void const *command, size_t size) |
97 | { | 98 | { |
98 | qDebug() << "Pipeline: New Entity"; | 99 | Log() << "Pipeline: New Entity"; |
99 | 100 | ||
100 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | 101 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. |
101 | const auto key = QUuid::createUuid().toString().toUtf8(); | 102 | const auto key = QUuid::createUuid().toString().toUtf8(); |
@@ -136,7 +137,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
136 | 137 | ||
137 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | 138 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); |
138 | storage().setMaxRevision(newRevision); | 139 | storage().setMaxRevision(newRevision); |
139 | qDebug() << "Pipeline: wrote entity: "<< newRevision; | 140 | Log() << "Pipeline: wrote entity: "<< newRevision; |
140 | 141 | ||
141 | return Async::start<void>([this, key, entityType](Async::Future<void> &future) { | 142 | return Async::start<void>([this, key, entityType](Async::Future<void> &future) { |
142 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { | 143 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 22d7d97..7320e50 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -41,23 +41,14 @@ class QueuedCommand | |||
41 | public: | 41 | public: |
42 | QueuedCommand(int commandId, const std::function<void(int, const QString &)> &callback) | 42 | QueuedCommand(int commandId, const std::function<void(int, const QString &)> &callback) |
43 | : commandId(commandId), | 43 | : commandId(commandId), |
44 | bufferSize(0), | ||
45 | buffer(0), | ||
46 | callback(callback) | 44 | callback(callback) |
47 | {} | 45 | {} |
48 | 46 | ||
49 | QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void(int, const QString &)> &callback) | 47 | QueuedCommand(int commandId, const QByteArray &b, const std::function<void(int, const QString &)> &callback) |
50 | : commandId(commandId), | 48 | : commandId(commandId), |
51 | bufferSize(fbb.GetSize()), | 49 | buffer(b), |
52 | buffer(new char[bufferSize]), | ||
53 | callback(callback) | 50 | callback(callback) |
54 | { | 51 | { |
55 | memcpy(buffer, fbb.GetBufferPointer(), bufferSize); | ||
56 | } | ||
57 | |||
58 | ~QueuedCommand() | ||
59 | { | ||
60 | delete[] buffer; | ||
61 | } | 52 | } |
62 | 53 | ||
63 | private: | 54 | private: |
@@ -66,8 +57,7 @@ private: | |||
66 | 57 | ||
67 | public: | 58 | public: |
68 | const int commandId; | 59 | const int commandId; |
69 | const uint bufferSize; | 60 | QByteArray buffer; |
70 | char *buffer; | ||
71 | std::function<void(int, const QString &)> callback; | 61 | std::function<void(int, const QString &)> callback; |
72 | }; | 62 | }; |
73 | 63 | ||
@@ -159,7 +149,9 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId) | |||
159 | 149 | ||
160 | Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | 150 | Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) |
161 | { | 151 | { |
162 | return Async::start<void>([commandId, &fbb, this](Async::Future<void> &f) { | 152 | //The flatbuffer is transient, but we want to store it until the job is executed |
153 | QByteArray buffer(reinterpret_cast<const char*>(fbb.GetBufferPointer()), fbb.GetSize()); | ||
154 | return Async::start<void>([commandId, buffer, this](Async::Future<void> &f) { | ||
163 | auto callback = [&f](int error, const QString &errorMessage) { | 155 | auto callback = [&f](int error, const QString &errorMessage) { |
164 | if (error) { | 156 | if (error) { |
165 | f.setError(error, errorMessage); | 157 | f.setError(error, errorMessage); |
@@ -169,12 +161,13 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu | |||
169 | }; | 161 | }; |
170 | 162 | ||
171 | if (isReady()) { | 163 | if (isReady()) { |
164 | //TODO: We probably always want to queue the command, so we can resend it in case something goes wrong | ||
172 | d->messageId++; | 165 | d->messageId++; |
173 | log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); | 166 | log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); |
174 | registerCallback(d->messageId, callback); | 167 | registerCallback(d->messageId, callback); |
175 | Commands::write(d->socket, d->messageId, commandId, fbb); | 168 | Commands::write(d->socket, d->messageId, commandId, buffer.constData(), buffer.size()); |
176 | } else { | 169 | } else { |
177 | d->commandQueue << new QueuedCommand(commandId, fbb, callback); | 170 | d->commandQueue << new QueuedCommand(commandId, buffer, callback); |
178 | } | 171 | } |
179 | }); | 172 | }); |
180 | } | 173 | } |
@@ -237,7 +230,7 @@ void ResourceAccess::connected() | |||
237 | if (command->callback) { | 230 | if (command->callback) { |
238 | registerCallback(d->messageId, command->callback); | 231 | registerCallback(d->messageId, command->callback); |
239 | } | 232 | } |
240 | Commands::write(d->socket, d->messageId, command->commandId, command->buffer, command->bufferSize); | 233 | Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); |
241 | delete command; | 234 | delete command; |
242 | } | 235 | } |
243 | d->commandQueue.clear(); | 236 | d->commandQueue.clear(); |
diff --git a/dummyresource/domainadaptor.cpp b/dummyresource/domainadaptor.cpp index d902052..2182f9a 100644 --- a/dummyresource/domainadaptor.cpp +++ b/dummyresource/domainadaptor.cpp | |||
@@ -100,13 +100,13 @@ QSharedPointer<Akonadi2::Domain::BufferAdaptor> DummyEventAdaptorFactory::create | |||
100 | } | 100 | } |
101 | } | 101 | } |
102 | 102 | ||
103 | Akonadi2::Metadata const *metadataBuffer = 0; | 103 | // Akonadi2::Metadata const *metadataBuffer = 0; |
104 | if (auto metadataData = entity.metadata()) { | 104 | // if (auto metadataData = entity.metadata()) { |
105 | flatbuffers::Verifier verifyer(metadataData->Data(), metadataData->size()); | 105 | // flatbuffers::Verifier verifyer(metadataData->Data(), metadataData->size()); |
106 | if (Akonadi2::VerifyMetadataBuffer(verifyer)) { | 106 | // if (Akonadi2::VerifyMetadataBuffer(verifyer)) { |
107 | metadataBuffer = Akonadi2::GetMetadata(metadataData->Data()); | 107 | // metadataBuffer = Akonadi2::GetMetadata(metadataData->Data()); |
108 | } | 108 | // } |
109 | } | 109 | // } |
110 | 110 | ||
111 | Akonadi2::Domain::Buffer::Event const *localBuffer = 0; | 111 | Akonadi2::Domain::Buffer::Event const *localBuffer = 0; |
112 | if (auto localData = entity.local()) { | 112 | if (auto localData = entity.local()) { |
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 10c8eaf..6e06250 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -122,6 +122,17 @@ public: | |||
122 | signals: | 122 | signals: |
123 | void error(int errorCode, const QString &errorMessage); | 123 | void error(int errorCode, const QString &errorMessage); |
124 | 124 | ||
125 | private: | ||
126 | bool messagesToProcessAvailable() | ||
127 | { | ||
128 | for (auto queue : mCommandQueues) { | ||
129 | if (!queue->isEmpty()) { | ||
130 | return true; | ||
131 | } | ||
132 | } | ||
133 | return false; | ||
134 | } | ||
135 | |||
125 | private slots: | 136 | private slots: |
126 | void process() | 137 | void process() |
127 | { | 138 | { |
@@ -131,11 +142,15 @@ private slots: | |||
131 | mProcessingLock = true; | 142 | mProcessingLock = true; |
132 | auto job = processPipeline().then<void>([this]() { | 143 | auto job = processPipeline().then<void>([this]() { |
133 | mProcessingLock = false; | 144 | mProcessingLock = false; |
145 | if (messagesToProcessAvailable()) { | ||
146 | process(); | ||
147 | } | ||
134 | }).exec(); | 148 | }).exec(); |
135 | } | 149 | } |
136 | 150 | ||
137 | Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) | 151 | Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) |
138 | { | 152 | { |
153 | Log() << "Processing command: " << queuedCommand->commandId(); | ||
139 | //Throw command into appropriate pipeline | 154 | //Throw command into appropriate pipeline |
140 | switch (queuedCommand->commandId()) { | 155 | switch (queuedCommand->commandId()) { |
141 | case Akonadi2::Commands::DeleteEntityCommand: | 156 | case Akonadi2::Commands::DeleteEntityCommand: |
@@ -170,7 +185,7 @@ private slots: | |||
170 | 185 | ||
171 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | 186 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); |
172 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | 187 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { |
173 | qWarning() << "invalid buffer"; | 188 | Warning() << "invalid buffer"; |
174 | callback(false); | 189 | callback(false); |
175 | return; | 190 | return; |
176 | } | 191 | } |
@@ -185,13 +200,13 @@ private slots: | |||
185 | callback(true); | 200 | callback(true); |
186 | }, | 201 | }, |
187 | [callback](int errorCode, QString errorMessage) { | 202 | [callback](int errorCode, QString errorMessage) { |
188 | Warning() << errorMessage; | 203 | Warning() << "Error while processing queue command: " << errorMessage; |
189 | callback(false); | 204 | callback(false); |
190 | } | 205 | } |
191 | ).exec(); | 206 | ).exec(); |
192 | }, | 207 | }, |
193 | [&future](const MessageQueue::Error &error) { | 208 | [&future](const MessageQueue::Error &error) { |
194 | Warning() << error.message; | 209 | Warning() << "Error while getting message from messagequeue: " << error.message; |
195 | future.setValue(false); | 210 | future.setValue(false); |
196 | future.setFinished(); | 211 | future.setFinished(); |
197 | } | 212 | } |
@@ -209,6 +224,7 @@ private slots: | |||
209 | [it, this](Async::Future<void> &future) { | 224 | [it, this](Async::Future<void> &future) { |
210 | auto queue = it->next(); | 225 | auto queue = it->next(); |
211 | processQueue(queue).then<void>([&future]() { | 226 | processQueue(queue).then<void>([&future]() { |
227 | Trace() << "Queue processed"; | ||
212 | future.setFinished(); | 228 | future.setFinished(); |
213 | }).exec(); | 229 | }).exec(); |
214 | } | 230 | } |
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 5165111..2e1e918 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -254,11 +254,13 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
254 | break; | 254 | break; |
255 | default: | 255 | default: |
256 | if (commandId > Akonadi2::Commands::CustomCommand) { | 256 | if (commandId > Akonadi2::Commands::CustomCommand) { |
257 | Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; | ||
257 | loadResource(); | 258 | loadResource(); |
258 | if (m_resource) { | 259 | if (m_resource) { |
259 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | 260 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); |
260 | } | 261 | } |
261 | } else { | 262 | } else { |
263 | Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; | ||
262 | //TODO: handle error: we don't know wtf this command is | 264 | //TODO: handle error: we don't know wtf this command is |
263 | } | 265 | } |
264 | break; | 266 | break; |
@@ -376,7 +378,7 @@ void Listener::loadResource() | |||
376 | Log() << QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type()); | 378 | Log() << QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type()); |
377 | m_resource->configurePipeline(m_pipeline); | 379 | m_resource->configurePipeline(m_pipeline); |
378 | } else { | 380 | } else { |
379 | Warning() << QString("Failed to load resource %1").arg(m_resourceName); | 381 | Error() << QString("Failed to load resource %1").arg(m_resourceName); |
380 | } | 382 | } |
381 | //TODO: on failure ... what? | 383 | //TODO: on failure ... what? |
382 | //Enter broken state? | 384 | //Enter broken state? |
diff --git a/tests/domainadaptortest.cpp b/tests/domainadaptortest.cpp index d1a9d26..cedbf94 100644 --- a/tests/domainadaptortest.cpp +++ b/tests/domainadaptortest.cpp | |||
@@ -74,13 +74,13 @@ public: | |||
74 | } | 74 | } |
75 | } | 75 | } |
76 | 76 | ||
77 | Akonadi2::Metadata const *metadataBuffer = 0; | 77 | // Akonadi2::Metadata const *metadataBuffer = 0; |
78 | if (auto metadataData = entity.metadata()) { | 78 | // if (auto metadataData = entity.metadata()) { |
79 | flatbuffers::Verifier verifyer(metadataData->Data(), metadataData->size()); | 79 | // flatbuffers::Verifier verifyer(metadataData->Data(), metadataData->size()); |
80 | if (Akonadi2::VerifyMetadataBuffer(verifyer)) { | 80 | // if (Akonadi2::VerifyMetadataBuffer(verifyer)) { |
81 | metadataBuffer = Akonadi2::GetMetadata(metadataData); | 81 | // metadataBuffer = Akonadi2::GetMetadata(metadataData); |
82 | } | 82 | // } |
83 | } | 83 | // } |
84 | 84 | ||
85 | Akonadi2::Domain::Buffer::Event const *localBuffer = 0; | 85 | Akonadi2::Domain::Buffer::Event const *localBuffer = 0; |
86 | if (auto localData = entity.local()) { | 86 | if (auto localData = entity.local()) { |