summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-15 23:08:34 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-15 23:08:34 +0100
commit0faf38f2ad9672fb46c77cae7317f44c72ebd10e (patch)
tree8776253e72d83b4f824631618af5c7393f9d964d
parenta0f517df8c4633d33820186954bdf803941e92bf (diff)
downloadsink-0faf38f2ad9672fb46c77cae7317f44c72ebd10e.tar.gz
sink-0faf38f2ad9672fb46c77cae7317f44c72ebd10e.zip
Async message queue processing.
The Job/Future in Pipeline::newEntity for some reason crashes with async pipeline processing.
-rw-r--r--common/pipeline.cpp79
-rw-r--r--dummyresource/resourcefactory.cpp85
-rw-r--r--dummyresource/resourcefactory.h1
-rw-r--r--tests/dummyresourcetest.cpp19
4 files changed, 113 insertions, 71 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 9cc7450..339a39c 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -95,44 +95,49 @@ void Pipeline::null()
95 95
96Async::Job<void> Pipeline::newEntity(void const *command, size_t size) 96Async::Job<void> Pipeline::newEntity(void const *command, size_t size)
97{ 97{
98 qDebug() << "new entity"; 98 qDebug() << "new entity" << size;
99 Async::start<void>([&](Async::Future<void> future) {
100
101 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
102 const auto key = QUuid::createUuid().toString().toUtf8();
103
104 //TODO figure out if we already have created a revision for the message?
105 const qint64 newRevision = storage().maxRevision() + 1;
106
107 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
108 //TODO rename createEntitiy->domainType to bufferType
109 const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
110 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data());
111 //
112 // const QString entityType;
113 // auto entity = Akonadi2::GetEntity(0);
114
115 //Add metadata buffer
116 flatbuffers::FlatBufferBuilder metadataFbb;
117 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
118 metadataBuilder.add_revision(newRevision);
119 metadataBuilder.add_processed(false);
120 auto metadataBuffer = metadataBuilder.Finish();
121 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
122 //TODO we should reserve some space in metadata for in-place updates
123
124 flatbuffers::FlatBufferBuilder fbb;
125 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
126
127 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
128 storage().setMaxRevision(newRevision);
129 99
100 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
101 const auto key = QUuid::createUuid().toString().toUtf8();
102
103 //TODO figure out if we already have created a revision for the message?
104 const qint64 newRevision = storage().maxRevision() + 1;
105
106 {
107 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
108 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) {
109 qWarning() << "invalid buffer";
110 return Async::null<void>();
111 }
112 }
113
114 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
115
116 //TODO rename createEntitiy->domainType to bufferType
117 const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
118 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data());
119
120 //Add metadata buffer
121 flatbuffers::FlatBufferBuilder metadataFbb;
122 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
123 metadataBuilder.add_revision(newRevision);
124 metadataBuilder.add_processed(false);
125 auto metadataBuffer = metadataBuilder.Finish();
126 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
127 //TODO we should reserve some space in metadata for in-place updates
128
129 flatbuffers::FlatBufferBuilder fbb;
130 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
131
132 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
133 storage().setMaxRevision(newRevision);
134
135 return Async::start<void>([this, key, entityType](Async::Future<void> &future) {
130 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { 136 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() {
131 future.setFinished(); 137 future.setFinished();
132 }); 138 });
133 d->activePipelines << state; 139 d->activePipelines << state;
134 state.step(); 140 state.step();
135
136 }); 141 });
137} 142}
138 143
@@ -157,10 +162,12 @@ void Pipeline::pipelineStepped(const PipelineState &state)
157 162
158void Pipeline::scheduleStep() 163void Pipeline::scheduleStep()
159{ 164{
160 if (!d->stepScheduled) { 165 // if (!d->stepScheduled) {
161 d->stepScheduled = true; 166 // d->stepScheduled = true;
162 QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); 167 // QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection);
163 } 168 // }
169 //FIXME make async again. For some reason the job in newEntity crashes if pipeline processing is async.
170 stepPipelines();
164} 171}
165 172
166void Pipeline::stepPipelines() 173void Pipeline::stepPipelines()
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index e14aa01..b43e4a3 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -116,26 +116,40 @@ public:
116 } 116 }
117 117
118private slots: 118private slots:
119 static void asyncWhile(const std::function<void(std::function<void(bool)>)> &body, const std::function<void()> &completionHandler) {
120 body([body, completionHandler](bool complete) {
121 if (complete) {
122 completionHandler();
123 } else {
124 asyncWhile(body, completionHandler);
125 }
126 });
127 }
128
119 void process() 129 void process()
120 { 130 {
121 if (mProcessingLock) { 131 if (mProcessingLock) {
122 return; 132 return;
123 } 133 }
124 mProcessingLock = true; 134 mProcessingLock = true;
125 //Empty queue after queue 135 auto job = processPipeline().then<void>([this](Async::Future<void> &future) {
126 //FIXME the for and while loops should be async, otherwise we process all messages in parallel 136 mProcessingLock = false;
127 for (auto queue : mCommandQueues) { 137 future.setFinished();
128 qDebug() << "processing queue"; 138 }).exec();
129 bool processedMessage = false; 139 }
130 while (processedMessage) { 140
131 qDebug() << "process"; 141 Async::Job<void> processPipeline()
132 processedMessage = false; 142 {
133 queue->dequeue([this, &processedMessage](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { 143 auto job = Async::start<void>([this](Async::Future<void> &future) {
144 //TODO process all queues in async for
145 auto queue = mCommandQueues.first();
146 asyncWhile([&](std::function<void(bool)> whileCallback) {
147 queue->dequeue([this, whileCallback](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
134 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); 148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
135 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { 149 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
136 qWarning() << "invalid buffer"; 150 qWarning() << "invalid buffer";
137 processedMessage = false;
138 messageQueueCallback(false); 151 messageQueueCallback(false);
152 whileCallback(true);
139 return; 153 return;
140 } 154 }
141 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); 155 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
@@ -150,26 +164,32 @@ private slots:
150 break; 164 break;
151 case Akonadi2::Commands::CreateEntityCommand: { 165 case Akonadi2::Commands::CreateEntityCommand: {
152 //TODO job lifetime management 166 //TODO job lifetime management
153 auto job = mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([&messageQueueCallback](Async::Future<void> future) { 167 mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([&messageQueueCallback, whileCallback](Async::Future<void> &future) {
154 messageQueueCallback(true); 168 messageQueueCallback(true);
155 }); 169 whileCallback(false);
156 job.exec(); 170 future.setFinished();
171 }).exec();
157 } 172 }
158 break; 173 break;
159 default: 174 default:
160 //Unhandled command 175 //Unhandled command
161 qWarning() << "Unhandled command"; 176 qWarning() << "Unhandled command";
162 messageQueueCallback(true); 177 messageQueueCallback(true);
178 whileCallback(false);
163 break; 179 break;
164 } 180 }
165 processedMessage = true;
166 }, 181 },
167 [&processedMessage](const MessageQueue::Error &error) { 182 [whileCallback](const MessageQueue::Error &error) {
168 processedMessage = false; 183 qDebug() << "no more messages in queue";
184 whileCallback(true);
169 }); 185 });
170 } 186 },
171 } 187 [&future]() { //while complete
172 mProcessingLock = false; 188 future.setFinished();
189 //Call async-for completion handler
190 });
191 });
192 return job;
173 } 193 }
174 194
175private: 195private:
@@ -226,6 +246,18 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri
226 }); 246 });
227} 247}
228 248
249void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data)
250{
251 m_fbb.Clear();
252 auto commandData = m_fbb.CreateVector(reinterpret_cast<uint8_t const *>(data.data()), data.size());
253 auto builder = Akonadi2::QueuedCommandBuilder(m_fbb);
254 builder.add_commandId(commandId);
255 builder.add_command(commandData);
256 auto buffer = builder.Finish();
257 Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer);
258 mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
259}
260
229Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) 261Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline)
230{ 262{
231 qDebug() << "synchronizeWithSource"; 263 qDebug() << "synchronizeWithSource";
@@ -259,11 +291,7 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
259 builder.add_attachment(attachment); 291 builder.add_attachment(attachment);
260 auto buffer = builder.Finish(); 292 auto buffer = builder.Finish();
261 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); 293 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
262 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. 294 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast<char const *>(m_fbb.GetBufferPointer()), m_fbb.GetSize()));
263 const auto key = QUuid::createUuid().toString().toUtf8();
264 //TODO Create queuedcommand and push into synchronizerQueue
265 //* create message in store directly, add command to messagequeue waiting for processing.
266 // pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize());
267 } else { //modification 295 } else { //modification
268 //TODO diff and create modification if necessary 296 //TODO diff and create modification if necessary
269 } 297 }
@@ -279,14 +307,7 @@ void DummyResource::processCommand(int commandId, const QByteArray &data, uint s
279 //TODO instead of copying the command including the full entity first into the command queue, we could directly 307 //TODO instead of copying the command including the full entity first into the command queue, we could directly
280 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). 308 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay).
281 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). 309 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire).
282 m_fbb.Clear(); 310 enqueueCommand(mUserQueue, commandId, data);
283 auto commandData = m_fbb.CreateVector(reinterpret_cast<uint8_t const *>(data.data()), data.size());
284 auto builder = Akonadi2::QueuedCommandBuilder(m_fbb);
285 builder.add_commandId(commandId);
286 builder.add_command(commandData);
287 auto buffer = builder.Finish();
288 Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer);
289 mUserQueue.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
290} 311}
291 312
292DummyResourceFactory::DummyResourceFactory(QObject *parent) 313DummyResourceFactory::DummyResourceFactory(QObject *parent)
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h
index 682f25c..6043fb6 100644
--- a/dummyresource/resourcefactory.h
+++ b/dummyresource/resourcefactory.h
@@ -39,6 +39,7 @@ public:
39 void configurePipeline(Akonadi2::Pipeline *pipeline); 39 void configurePipeline(Akonadi2::Pipeline *pipeline);
40 40
41private: 41private:
42 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
42 flatbuffers::FlatBufferBuilder m_fbb; 43 flatbuffers::FlatBufferBuilder m_fbb;
43 MessageQueue mUserQueue; 44 MessageQueue mUserQueue;
44 MessageQueue mSynchronizerQueue; 45 MessageQueue mSynchronizerQueue;
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp
index ddb59a5..c469796 100644
--- a/tests/dummyresourcetest.cpp
+++ b/tests/dummyresourcetest.cpp
@@ -14,7 +14,7 @@
14 14
15static void removeFromDisk(const QString &name) 15static void removeFromDisk(const QString &name)
16{ 16{
17 Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy", Akonadi2::Storage::ReadWrite); 17 Akonadi2::Storage store(Akonadi2::Store::storageLocation(), name, Akonadi2::Storage::ReadWrite);
18 store.removeFromDisk(); 18 store.removeFromDisk();
19} 19}
20 20
@@ -33,6 +33,9 @@ private Q_SLOTS:
33 33
34 void cleanupTestCase() 34 void cleanupTestCase()
35 { 35 {
36 removeFromDisk("org.kde.dummy");
37 removeFromDisk("org.kde.dummy.userqueue");
38 removeFromDisk("org.kde.dummy.synchronizerqueue");
36 } 39 }
37 40
38 void testProcessCommand() 41 void testProcessCommand()
@@ -60,13 +63,23 @@ private Q_SLOTS:
60 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); 63 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
61 64
62 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); 65 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
66 {
67 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command.data()), command.size());
68 QVERIFY(Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer));
69 }
63 70
71 //Actual test
64 Akonadi2::Pipeline pipeline("org.kde.dummy"); 72 Akonadi2::Pipeline pipeline("org.kde.dummy");
73 QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated()));
65 DummyResource resource; 74 DummyResource resource;
66 resource.configurePipeline(&pipeline); 75 resource.configurePipeline(&pipeline);
67 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); 76 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline);
68 //TODO wait until the pipeline has processed the command 77 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline);
69 QTest::qWait(1000); 78
79 QVERIFY(revisionSpy.isValid());
80 QTRY_COMPARE(revisionSpy.count(), 2);
81 QTest::qWait(100);
82 QCOMPARE(revisionSpy.count(), 2);
70 } 83 }
71 84
72 // void testResourceSync() 85 // void testResourceSync()