summaryrefslogtreecommitdiffstats
path: root/dummyresource/resourcefactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'dummyresource/resourcefactory.cpp')
-rw-r--r--dummyresource/resourcefactory.cpp85
1 files changed, 53 insertions, 32 deletions
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index e14aa01..b43e4a3 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -116,26 +116,40 @@ public:
116 } 116 }
117 117
118private slots: 118private slots:
119 static void asyncWhile(const std::function<void(std::function<void(bool)>)> &body, const std::function<void()> &completionHandler) {
120 body([body, completionHandler](bool complete) {
121 if (complete) {
122 completionHandler();
123 } else {
124 asyncWhile(body, completionHandler);
125 }
126 });
127 }
128
119 void process() 129 void process()
120 { 130 {
121 if (mProcessingLock) { 131 if (mProcessingLock) {
122 return; 132 return;
123 } 133 }
124 mProcessingLock = true; 134 mProcessingLock = true;
125 //Empty queue after queue 135 auto job = processPipeline().then<void>([this](Async::Future<void> &future) {
126 //FIXME the for and while loops should be async, otherwise we process all messages in parallel 136 mProcessingLock = false;
127 for (auto queue : mCommandQueues) { 137 future.setFinished();
128 qDebug() << "processing queue"; 138 }).exec();
129 bool processedMessage = false; 139 }
130 while (processedMessage) { 140
131 qDebug() << "process"; 141 Async::Job<void> processPipeline()
132 processedMessage = false; 142 {
133 queue->dequeue([this, &processedMessage](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { 143 auto job = Async::start<void>([this](Async::Future<void> &future) {
144 //TODO process all queues in async for
145 auto queue = mCommandQueues.first();
146 asyncWhile([&](std::function<void(bool)> whileCallback) {
147 queue->dequeue([this, whileCallback](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
134 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); 148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
135 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { 149 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
136 qWarning() << "invalid buffer"; 150 qWarning() << "invalid buffer";
137 processedMessage = false;
138 messageQueueCallback(false); 151 messageQueueCallback(false);
152 whileCallback(true);
139 return; 153 return;
140 } 154 }
141 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); 155 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
@@ -150,26 +164,32 @@ private slots:
150 break; 164 break;
151 case Akonadi2::Commands::CreateEntityCommand: { 165 case Akonadi2::Commands::CreateEntityCommand: {
152 //TODO job lifetime management 166 //TODO job lifetime management
153 auto job = mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([&messageQueueCallback](Async::Future<void> future) { 167 mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([&messageQueueCallback, whileCallback](Async::Future<void> &future) {
154 messageQueueCallback(true); 168 messageQueueCallback(true);
155 }); 169 whileCallback(false);
156 job.exec(); 170 future.setFinished();
171 }).exec();
157 } 172 }
158 break; 173 break;
159 default: 174 default:
160 //Unhandled command 175 //Unhandled command
161 qWarning() << "Unhandled command"; 176 qWarning() << "Unhandled command";
162 messageQueueCallback(true); 177 messageQueueCallback(true);
178 whileCallback(false);
163 break; 179 break;
164 } 180 }
165 processedMessage = true;
166 }, 181 },
167 [&processedMessage](const MessageQueue::Error &error) { 182 [whileCallback](const MessageQueue::Error &error) {
168 processedMessage = false; 183 qDebug() << "no more messages in queue";
184 whileCallback(true);
169 }); 185 });
170 } 186 },
171 } 187 [&future]() { //while complete
172 mProcessingLock = false; 188 future.setFinished();
189 //Call async-for completion handler
190 });
191 });
192 return job;
173 } 193 }
174 194
175private: 195private:
@@ -226,6 +246,18 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri
226 }); 246 });
227} 247}
228 248
249void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data)
250{
251 m_fbb.Clear();
252 auto commandData = m_fbb.CreateVector(reinterpret_cast<uint8_t const *>(data.data()), data.size());
253 auto builder = Akonadi2::QueuedCommandBuilder(m_fbb);
254 builder.add_commandId(commandId);
255 builder.add_command(commandData);
256 auto buffer = builder.Finish();
257 Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer);
258 mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
259}
260
229Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) 261Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline)
230{ 262{
231 qDebug() << "synchronizeWithSource"; 263 qDebug() << "synchronizeWithSource";
@@ -259,11 +291,7 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
259 builder.add_attachment(attachment); 291 builder.add_attachment(attachment);
260 auto buffer = builder.Finish(); 292 auto buffer = builder.Finish();
261 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); 293 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
262 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. 294 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast<char const *>(m_fbb.GetBufferPointer()), m_fbb.GetSize()));
263 const auto key = QUuid::createUuid().toString().toUtf8();
264 //TODO Create queuedcommand and push into synchronizerQueue
265 //* create message in store directly, add command to messagequeue waiting for processing.
266 // pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize());
267 } else { //modification 295 } else { //modification
268 //TODO diff and create modification if necessary 296 //TODO diff and create modification if necessary
269 } 297 }
@@ -279,14 +307,7 @@ void DummyResource::processCommand(int commandId, const QByteArray &data, uint s
279 //TODO instead of copying the command including the full entity first into the command queue, we could directly 307 //TODO instead of copying the command including the full entity first into the command queue, we could directly
280 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). 308 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay).
281 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). 309 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire).
282 m_fbb.Clear(); 310 enqueueCommand(mUserQueue, commandId, data);
283 auto commandData = m_fbb.CreateVector(reinterpret_cast<uint8_t const *>(data.data()), data.size());
284 auto builder = Akonadi2::QueuedCommandBuilder(m_fbb);
285 builder.add_commandId(commandId);
286 builder.add_command(commandData);
287 auto buffer = builder.Finish();
288 Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer);
289 mUserQueue.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
290} 311}
291 312
292DummyResourceFactory::DummyResourceFactory(QObject *parent) 313DummyResourceFactory::DummyResourceFactory(QObject *parent)