summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/clientapi.cpp5
-rw-r--r--common/clientapi.h17
-rw-r--r--common/pipeline.cpp5
-rw-r--r--common/resourceaccess.cpp27
-rw-r--r--dummyresource/domainadaptor.cpp14
-rw-r--r--dummyresource/resourcefactory.cpp22
-rw-r--r--synchronizer/listener.cpp4
-rw-r--r--tests/domainadaptortest.cpp14
8 files changed, 63 insertions, 45 deletions
diff --git a/common/clientapi.cpp b/common/clientapi.cpp
index 10115da..6f0b421 100644
--- a/common/clientapi.cpp
+++ b/common/clientapi.cpp
@@ -36,10 +36,7 @@ void Store::shutdown(const QString &identifier)
36 Akonadi2::ResourceAccess resourceAccess(identifier); 36 Akonadi2::ResourceAccess resourceAccess(identifier);
37 //FIXME this starts the resource, just to shut it down again if it's not running in the first place. 37 //FIXME this starts the resource, just to shut it down again if it's not running in the first place.
38 resourceAccess.open(); 38 resourceAccess.open();
39 resourceAccess.sendCommand(Akonadi2::Commands::ShutdownCommand).then<void>([](Async::Future<void> &f){ 39 resourceAccess.sendCommand(Akonadi2::Commands::ShutdownCommand).exec().waitForFinished();
40 //TODO wait for disconnect
41 f.setFinished();
42 }).exec().waitForFinished();
43} 40}
44 41
45} // namespace Akonadi2 42} // namespace Akonadi2
diff --git a/common/clientapi.h b/common/clientapi.h
index aa3aab8..63305ab 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -425,13 +425,22 @@ public:
425 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); 425 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource);
426 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. 426 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive.
427 std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); 427 std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1);
428 //We copy the facade pointer to keep it alive 428
429 job = job.then(facade->load(query, addCallback)); 429 // TODO The following is a necessary hack to keep the facade alive.
430 // Otherwise this would reduce to:
431 // job = job.then(facade->load(query, addCallback));
432 // We somehow have to guarantee that the facade remains valid for the duration of the job
433 job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) {
434 Async::Job<void> j = facade->load(query, addCallback);
435 j.then<void>([&future, facade](Async::Future<void> &f) {
436 future.setFinished();
437 f.setFinished();
438 }).exec();
439 });
430 } 440 }
431 job.then<void>([/* eventloop, */resultSet](Async::Future<void> &future) { 441 job.then<void>([resultSet]() {
432 qDebug() << "Query complete"; 442 qDebug() << "Query complete";
433 resultSet->complete(); 443 resultSet->complete();
434 future.setFinished();
435 }).exec().waitForFinished(); //We use the eventloop provided by waitForFinished to keep the thread alive until all is done 444 }).exec().waitForFinished(); //We use the eventloop provided by waitForFinished to keep the thread alive until all is done
436 }); 445 });
437 return resultSet->emitter(); 446 return resultSet->emitter();
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index f321cf5..ed40699 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -30,6 +30,7 @@
30#include "createentity_generated.h" 30#include "createentity_generated.h"
31#include "entitybuffer.h" 31#include "entitybuffer.h"
32#include "async/src/async.h" 32#include "async/src/async.h"
33#include "log.h"
33 34
34namespace Akonadi2 35namespace Akonadi2
35{ 36{
@@ -95,7 +96,7 @@ void Pipeline::null()
95 96
96Async::Job<void> Pipeline::newEntity(void const *command, size_t size) 97Async::Job<void> Pipeline::newEntity(void const *command, size_t size)
97{ 98{
98 qDebug() << "Pipeline: New Entity"; 99 Log() << "Pipeline: New Entity";
99 100
100 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. 101 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
101 const auto key = QUuid::createUuid().toString().toUtf8(); 102 const auto key = QUuid::createUuid().toString().toUtf8();
@@ -136,7 +137,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size)
136 137
137 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); 138 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
138 storage().setMaxRevision(newRevision); 139 storage().setMaxRevision(newRevision);
139 qDebug() << "Pipeline: wrote entity: "<< newRevision; 140 Log() << "Pipeline: wrote entity: "<< newRevision;
140 141
141 return Async::start<void>([this, key, entityType](Async::Future<void> &future) { 142 return Async::start<void>([this, key, entityType](Async::Future<void> &future) {
142 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { 143 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() {
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 22d7d97..7320e50 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -41,23 +41,14 @@ class QueuedCommand
41public: 41public:
42 QueuedCommand(int commandId, const std::function<void(int, const QString &)> &callback) 42 QueuedCommand(int commandId, const std::function<void(int, const QString &)> &callback)
43 : commandId(commandId), 43 : commandId(commandId),
44 bufferSize(0),
45 buffer(0),
46 callback(callback) 44 callback(callback)
47 {} 45 {}
48 46
49 QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void(int, const QString &)> &callback) 47 QueuedCommand(int commandId, const QByteArray &b, const std::function<void(int, const QString &)> &callback)
50 : commandId(commandId), 48 : commandId(commandId),
51 bufferSize(fbb.GetSize()), 49 buffer(b),
52 buffer(new char[bufferSize]),
53 callback(callback) 50 callback(callback)
54 { 51 {
55 memcpy(buffer, fbb.GetBufferPointer(), bufferSize);
56 }
57
58 ~QueuedCommand()
59 {
60 delete[] buffer;
61 } 52 }
62 53
63private: 54private:
@@ -66,8 +57,7 @@ private:
66 57
67public: 58public:
68 const int commandId; 59 const int commandId;
69 const uint bufferSize; 60 QByteArray buffer;
70 char *buffer;
71 std::function<void(int, const QString &)> callback; 61 std::function<void(int, const QString &)> callback;
72}; 62};
73 63
@@ -159,7 +149,9 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId)
159 149
160Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) 150Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb)
161{ 151{
162 return Async::start<void>([commandId, &fbb, this](Async::Future<void> &f) { 152 //The flatbuffer is transient, but we want to store it until the job is executed
153 QByteArray buffer(reinterpret_cast<const char*>(fbb.GetBufferPointer()), fbb.GetSize());
154 return Async::start<void>([commandId, buffer, this](Async::Future<void> &f) {
163 auto callback = [&f](int error, const QString &errorMessage) { 155 auto callback = [&f](int error, const QString &errorMessage) {
164 if (error) { 156 if (error) {
165 f.setError(error, errorMessage); 157 f.setError(error, errorMessage);
@@ -169,12 +161,13 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu
169 }; 161 };
170 162
171 if (isReady()) { 163 if (isReady()) {
164 //TODO: We probably always want to queue the command, so we can resend it in case something goes wrong
172 d->messageId++; 165 d->messageId++;
173 log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); 166 log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId));
174 registerCallback(d->messageId, callback); 167 registerCallback(d->messageId, callback);
175 Commands::write(d->socket, d->messageId, commandId, fbb); 168 Commands::write(d->socket, d->messageId, commandId, buffer.constData(), buffer.size());
176 } else { 169 } else {
177 d->commandQueue << new QueuedCommand(commandId, fbb, callback); 170 d->commandQueue << new QueuedCommand(commandId, buffer, callback);
178 } 171 }
179 }); 172 });
180} 173}
@@ -237,7 +230,7 @@ void ResourceAccess::connected()
237 if (command->callback) { 230 if (command->callback) {
238 registerCallback(d->messageId, command->callback); 231 registerCallback(d->messageId, command->callback);
239 } 232 }
240 Commands::write(d->socket, d->messageId, command->commandId, command->buffer, command->bufferSize); 233 Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size());
241 delete command; 234 delete command;
242 } 235 }
243 d->commandQueue.clear(); 236 d->commandQueue.clear();
diff --git a/dummyresource/domainadaptor.cpp b/dummyresource/domainadaptor.cpp
index d902052..2182f9a 100644
--- a/dummyresource/domainadaptor.cpp
+++ b/dummyresource/domainadaptor.cpp
@@ -100,13 +100,13 @@ QSharedPointer<Akonadi2::Domain::BufferAdaptor> DummyEventAdaptorFactory::create
100 } 100 }
101 } 101 }
102 102
103 Akonadi2::Metadata const *metadataBuffer = 0; 103 // Akonadi2::Metadata const *metadataBuffer = 0;
104 if (auto metadataData = entity.metadata()) { 104 // if (auto metadataData = entity.metadata()) {
105 flatbuffers::Verifier verifyer(metadataData->Data(), metadataData->size()); 105 // flatbuffers::Verifier verifyer(metadataData->Data(), metadataData->size());
106 if (Akonadi2::VerifyMetadataBuffer(verifyer)) { 106 // if (Akonadi2::VerifyMetadataBuffer(verifyer)) {
107 metadataBuffer = Akonadi2::GetMetadata(metadataData->Data()); 107 // metadataBuffer = Akonadi2::GetMetadata(metadataData->Data());
108 } 108 // }
109 } 109 // }
110 110
111 Akonadi2::Domain::Buffer::Event const *localBuffer = 0; 111 Akonadi2::Domain::Buffer::Event const *localBuffer = 0;
112 if (auto localData = entity.local()) { 112 if (auto localData = entity.local()) {
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index 10c8eaf..6e06250 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -122,6 +122,17 @@ public:
122signals: 122signals:
123 void error(int errorCode, const QString &errorMessage); 123 void error(int errorCode, const QString &errorMessage);
124 124
125private:
126 bool messagesToProcessAvailable()
127 {
128 for (auto queue : mCommandQueues) {
129 if (!queue->isEmpty()) {
130 return true;
131 }
132 }
133 return false;
134 }
135
125private slots: 136private slots:
126 void process() 137 void process()
127 { 138 {
@@ -131,11 +142,15 @@ private slots:
131 mProcessingLock = true; 142 mProcessingLock = true;
132 auto job = processPipeline().then<void>([this]() { 143 auto job = processPipeline().then<void>([this]() {
133 mProcessingLock = false; 144 mProcessingLock = false;
145 if (messagesToProcessAvailable()) {
146 process();
147 }
134 }).exec(); 148 }).exec();
135 } 149 }
136 150
137 Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) 151 Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand)
138 { 152 {
153 Log() << "Processing command: " << queuedCommand->commandId();
139 //Throw command into appropriate pipeline 154 //Throw command into appropriate pipeline
140 switch (queuedCommand->commandId()) { 155 switch (queuedCommand->commandId()) {
141 case Akonadi2::Commands::DeleteEntityCommand: 156 case Akonadi2::Commands::DeleteEntityCommand:
@@ -170,7 +185,7 @@ private slots:
170 185
171 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); 186 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
172 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { 187 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
173 qWarning() << "invalid buffer"; 188 Warning() << "invalid buffer";
174 callback(false); 189 callback(false);
175 return; 190 return;
176 } 191 }
@@ -185,13 +200,13 @@ private slots:
185 callback(true); 200 callback(true);
186 }, 201 },
187 [callback](int errorCode, QString errorMessage) { 202 [callback](int errorCode, QString errorMessage) {
188 Warning() << errorMessage; 203 Warning() << "Error while processing queue command: " << errorMessage;
189 callback(false); 204 callback(false);
190 } 205 }
191 ).exec(); 206 ).exec();
192 }, 207 },
193 [&future](const MessageQueue::Error &error) { 208 [&future](const MessageQueue::Error &error) {
194 Warning() << error.message; 209 Warning() << "Error while getting message from messagequeue: " << error.message;
195 future.setValue(false); 210 future.setValue(false);
196 future.setFinished(); 211 future.setFinished();
197 } 212 }
@@ -209,6 +224,7 @@ private slots:
209 [it, this](Async::Future<void> &future) { 224 [it, this](Async::Future<void> &future) {
210 auto queue = it->next(); 225 auto queue = it->next();
211 processQueue(queue).then<void>([&future]() { 226 processQueue(queue).then<void>([&future]() {
227 Trace() << "Queue processed";
212 future.setFinished(); 228 future.setFinished();
213 }).exec(); 229 }).exec();
214 } 230 }
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 5165111..2e1e918 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -254,11 +254,13 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
254 break; 254 break;
255 default: 255 default:
256 if (commandId > Akonadi2::Commands::CustomCommand) { 256 if (commandId > Akonadi2::Commands::CustomCommand) {
257 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId;
257 loadResource(); 258 loadResource();
258 if (m_resource) { 259 if (m_resource) {
259 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); 260 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
260 } 261 }
261 } else { 262 } else {
263 Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
262 //TODO: handle error: we don't know wtf this command is 264 //TODO: handle error: we don't know wtf this command is
263 } 265 }
264 break; 266 break;
@@ -376,7 +378,7 @@ void Listener::loadResource()
376 Log() << QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type()); 378 Log() << QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type());
377 m_resource->configurePipeline(m_pipeline); 379 m_resource->configurePipeline(m_pipeline);
378 } else { 380 } else {
379 Warning() << QString("Failed to load resource %1").arg(m_resourceName); 381 Error() << QString("Failed to load resource %1").arg(m_resourceName);
380 } 382 }
381 //TODO: on failure ... what? 383 //TODO: on failure ... what?
382 //Enter broken state? 384 //Enter broken state?
diff --git a/tests/domainadaptortest.cpp b/tests/domainadaptortest.cpp
index d1a9d26..cedbf94 100644
--- a/tests/domainadaptortest.cpp
+++ b/tests/domainadaptortest.cpp
@@ -74,13 +74,13 @@ public:
74 } 74 }
75 } 75 }
76 76
77 Akonadi2::Metadata const *metadataBuffer = 0; 77 // Akonadi2::Metadata const *metadataBuffer = 0;
78 if (auto metadataData = entity.metadata()) { 78 // if (auto metadataData = entity.metadata()) {
79 flatbuffers::Verifier verifyer(metadataData->Data(), metadataData->size()); 79 // flatbuffers::Verifier verifyer(metadataData->Data(), metadataData->size());
80 if (Akonadi2::VerifyMetadataBuffer(verifyer)) { 80 // if (Akonadi2::VerifyMetadataBuffer(verifyer)) {
81 metadataBuffer = Akonadi2::GetMetadata(metadataData); 81 // metadataBuffer = Akonadi2::GetMetadata(metadataData);
82 } 82 // }
83 } 83 // }
84 84
85 Akonadi2::Domain::Buffer::Event const *localBuffer = 0; 85 Akonadi2::Domain::Buffer::Event const *localBuffer = 0;
86 if (auto localData = entity.local()) { 86 if (auto localData = entity.local()) {