diff options
-rw-r--r-- | common/pipeline.cpp | 79 | ||||
-rw-r--r-- | dummyresource/resourcefactory.cpp | 85 | ||||
-rw-r--r-- | dummyresource/resourcefactory.h | 1 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 19 |
4 files changed, 113 insertions, 71 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 9cc7450..339a39c 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -95,44 +95,49 @@ void Pipeline::null() | |||
95 | 95 | ||
96 | Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | 96 | Async::Job<void> Pipeline::newEntity(void const *command, size_t size) |
97 | { | 97 | { |
98 | qDebug() << "new entity"; | 98 | qDebug() << "new entity" << size; |
99 | Async::start<void>([&](Async::Future<void> future) { | ||
100 | |||
101 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | ||
102 | const auto key = QUuid::createUuid().toString().toUtf8(); | ||
103 | |||
104 | //TODO figure out if we already have created a revision for the message? | ||
105 | const qint64 newRevision = storage().maxRevision() + 1; | ||
106 | |||
107 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | ||
108 | //TODO rename createEntitiy->domainType to bufferType | ||
109 | const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); | ||
110 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | ||
111 | // | ||
112 | // const QString entityType; | ||
113 | // auto entity = Akonadi2::GetEntity(0); | ||
114 | |||
115 | //Add metadata buffer | ||
116 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
117 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | ||
118 | metadataBuilder.add_revision(newRevision); | ||
119 | metadataBuilder.add_processed(false); | ||
120 | auto metadataBuffer = metadataBuilder.Finish(); | ||
121 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
122 | //TODO we should reserve some space in metadata for in-place updates | ||
123 | |||
124 | flatbuffers::FlatBufferBuilder fbb; | ||
125 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | ||
126 | |||
127 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | ||
128 | storage().setMaxRevision(newRevision); | ||
129 | 99 | ||
100 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | ||
101 | const auto key = QUuid::createUuid().toString().toUtf8(); | ||
102 | |||
103 | //TODO figure out if we already have created a revision for the message? | ||
104 | const qint64 newRevision = storage().maxRevision() + 1; | ||
105 | |||
106 | { | ||
107 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | ||
108 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { | ||
109 | qWarning() << "invalid buffer"; | ||
110 | return Async::null<void>(); | ||
111 | } | ||
112 | } | ||
113 | |||
114 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | ||
115 | |||
116 | //TODO rename createEntitiy->domainType to bufferType | ||
117 | const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); | ||
118 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | ||
119 | |||
120 | //Add metadata buffer | ||
121 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
122 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | ||
123 | metadataBuilder.add_revision(newRevision); | ||
124 | metadataBuilder.add_processed(false); | ||
125 | auto metadataBuffer = metadataBuilder.Finish(); | ||
126 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
127 | //TODO we should reserve some space in metadata for in-place updates | ||
128 | |||
129 | flatbuffers::FlatBufferBuilder fbb; | ||
130 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | ||
131 | |||
132 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | ||
133 | storage().setMaxRevision(newRevision); | ||
134 | |||
135 | return Async::start<void>([this, key, entityType](Async::Future<void> &future) { | ||
130 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { | 136 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { |
131 | future.setFinished(); | 137 | future.setFinished(); |
132 | }); | 138 | }); |
133 | d->activePipelines << state; | 139 | d->activePipelines << state; |
134 | state.step(); | 140 | state.step(); |
135 | |||
136 | }); | 141 | }); |
137 | } | 142 | } |
138 | 143 | ||
@@ -157,10 +162,12 @@ void Pipeline::pipelineStepped(const PipelineState &state) | |||
157 | 162 | ||
158 | void Pipeline::scheduleStep() | 163 | void Pipeline::scheduleStep() |
159 | { | 164 | { |
160 | if (!d->stepScheduled) { | 165 | // if (!d->stepScheduled) { |
161 | d->stepScheduled = true; | 166 | // d->stepScheduled = true; |
162 | QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); | 167 | // QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); |
163 | } | 168 | // } |
169 | //FIXME make async again. For some reason the job in newEntity crashes if pipeline processing is async. | ||
170 | stepPipelines(); | ||
164 | } | 171 | } |
165 | 172 | ||
166 | void Pipeline::stepPipelines() | 173 | void Pipeline::stepPipelines() |
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index e14aa01..b43e4a3 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -116,26 +116,40 @@ public: | |||
116 | } | 116 | } |
117 | 117 | ||
118 | private slots: | 118 | private slots: |
119 | static void asyncWhile(const std::function<void(std::function<void(bool)>)> &body, const std::function<void()> &completionHandler) { | ||
120 | body([body, completionHandler](bool complete) { | ||
121 | if (complete) { | ||
122 | completionHandler(); | ||
123 | } else { | ||
124 | asyncWhile(body, completionHandler); | ||
125 | } | ||
126 | }); | ||
127 | } | ||
128 | |||
119 | void process() | 129 | void process() |
120 | { | 130 | { |
121 | if (mProcessingLock) { | 131 | if (mProcessingLock) { |
122 | return; | 132 | return; |
123 | } | 133 | } |
124 | mProcessingLock = true; | 134 | mProcessingLock = true; |
125 | //Empty queue after queue | 135 | auto job = processPipeline().then<void>([this](Async::Future<void> &future) { |
126 | //FIXME the for and while loops should be async, otherwise we process all messages in parallel | 136 | mProcessingLock = false; |
127 | for (auto queue : mCommandQueues) { | 137 | future.setFinished(); |
128 | qDebug() << "processing queue"; | 138 | }).exec(); |
129 | bool processedMessage = false; | 139 | } |
130 | while (processedMessage) { | 140 | |
131 | qDebug() << "process"; | 141 | Async::Job<void> processPipeline() |
132 | processedMessage = false; | 142 | { |
133 | queue->dequeue([this, &processedMessage](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | 143 | auto job = Async::start<void>([this](Async::Future<void> &future) { |
144 | //TODO process all queues in async for | ||
145 | auto queue = mCommandQueues.first(); | ||
146 | asyncWhile([&](std::function<void(bool)> whileCallback) { | ||
147 | queue->dequeue([this, whileCallback](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | ||
134 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | 148 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); |
135 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | 149 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { |
136 | qWarning() << "invalid buffer"; | 150 | qWarning() << "invalid buffer"; |
137 | processedMessage = false; | ||
138 | messageQueueCallback(false); | 151 | messageQueueCallback(false); |
152 | whileCallback(true); | ||
139 | return; | 153 | return; |
140 | } | 154 | } |
141 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | 155 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); |
@@ -150,26 +164,32 @@ private slots: | |||
150 | break; | 164 | break; |
151 | case Akonadi2::Commands::CreateEntityCommand: { | 165 | case Akonadi2::Commands::CreateEntityCommand: { |
152 | //TODO job lifetime management | 166 | //TODO job lifetime management |
153 | auto job = mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([&messageQueueCallback](Async::Future<void> future) { | 167 | mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([&messageQueueCallback, whileCallback](Async::Future<void> &future) { |
154 | messageQueueCallback(true); | 168 | messageQueueCallback(true); |
155 | }); | 169 | whileCallback(false); |
156 | job.exec(); | 170 | future.setFinished(); |
171 | }).exec(); | ||
157 | } | 172 | } |
158 | break; | 173 | break; |
159 | default: | 174 | default: |
160 | //Unhandled command | 175 | //Unhandled command |
161 | qWarning() << "Unhandled command"; | 176 | qWarning() << "Unhandled command"; |
162 | messageQueueCallback(true); | 177 | messageQueueCallback(true); |
178 | whileCallback(false); | ||
163 | break; | 179 | break; |
164 | } | 180 | } |
165 | processedMessage = true; | ||
166 | }, | 181 | }, |
167 | [&processedMessage](const MessageQueue::Error &error) { | 182 | [whileCallback](const MessageQueue::Error &error) { |
168 | processedMessage = false; | 183 | qDebug() << "no more messages in queue"; |
184 | whileCallback(true); | ||
169 | }); | 185 | }); |
170 | } | 186 | }, |
171 | } | 187 | [&future]() { //while complete |
172 | mProcessingLock = false; | 188 | future.setFinished(); |
189 | //Call async-for completion handler | ||
190 | }); | ||
191 | }); | ||
192 | return job; | ||
173 | } | 193 | } |
174 | 194 | ||
175 | private: | 195 | private: |
@@ -226,6 +246,18 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri | |||
226 | }); | 246 | }); |
227 | } | 247 | } |
228 | 248 | ||
249 | void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) | ||
250 | { | ||
251 | m_fbb.Clear(); | ||
252 | auto commandData = m_fbb.CreateVector(reinterpret_cast<uint8_t const *>(data.data()), data.size()); | ||
253 | auto builder = Akonadi2::QueuedCommandBuilder(m_fbb); | ||
254 | builder.add_commandId(commandId); | ||
255 | builder.add_command(commandData); | ||
256 | auto buffer = builder.Finish(); | ||
257 | Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); | ||
258 | mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
259 | } | ||
260 | |||
229 | Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) | 261 | Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) |
230 | { | 262 | { |
231 | qDebug() << "synchronizeWithSource"; | 263 | qDebug() << "synchronizeWithSource"; |
@@ -259,11 +291,7 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli | |||
259 | builder.add_attachment(attachment); | 291 | builder.add_attachment(attachment); |
260 | auto buffer = builder.Finish(); | 292 | auto buffer = builder.Finish(); |
261 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); | 293 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); |
262 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | 294 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast<char const *>(m_fbb.GetBufferPointer()), m_fbb.GetSize())); |
263 | const auto key = QUuid::createUuid().toString().toUtf8(); | ||
264 | //TODO Create queuedcommand and push into synchronizerQueue | ||
265 | //* create message in store directly, add command to messagequeue waiting for processing. | ||
266 | // pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
267 | } else { //modification | 295 | } else { //modification |
268 | //TODO diff and create modification if necessary | 296 | //TODO diff and create modification if necessary |
269 | } | 297 | } |
@@ -279,14 +307,7 @@ void DummyResource::processCommand(int commandId, const QByteArray &data, uint s | |||
279 | //TODO instead of copying the command including the full entity first into the command queue, we could directly | 307 | //TODO instead of copying the command including the full entity first into the command queue, we could directly |
280 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). | 308 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). |
281 | //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). | 309 | //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). |
282 | m_fbb.Clear(); | 310 | enqueueCommand(mUserQueue, commandId, data); |
283 | auto commandData = m_fbb.CreateVector(reinterpret_cast<uint8_t const *>(data.data()), data.size()); | ||
284 | auto builder = Akonadi2::QueuedCommandBuilder(m_fbb); | ||
285 | builder.add_commandId(commandId); | ||
286 | builder.add_command(commandData); | ||
287 | auto buffer = builder.Finish(); | ||
288 | Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); | ||
289 | mUserQueue.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
290 | } | 311 | } |
291 | 312 | ||
292 | DummyResourceFactory::DummyResourceFactory(QObject *parent) | 313 | DummyResourceFactory::DummyResourceFactory(QObject *parent) |
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 682f25c..6043fb6 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h | |||
@@ -39,6 +39,7 @@ public: | |||
39 | void configurePipeline(Akonadi2::Pipeline *pipeline); | 39 | void configurePipeline(Akonadi2::Pipeline *pipeline); |
40 | 40 | ||
41 | private: | 41 | private: |
42 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | ||
42 | flatbuffers::FlatBufferBuilder m_fbb; | 43 | flatbuffers::FlatBufferBuilder m_fbb; |
43 | MessageQueue mUserQueue; | 44 | MessageQueue mUserQueue; |
44 | MessageQueue mSynchronizerQueue; | 45 | MessageQueue mSynchronizerQueue; |
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index ddb59a5..c469796 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -14,7 +14,7 @@ | |||
14 | 14 | ||
15 | static void removeFromDisk(const QString &name) | 15 | static void removeFromDisk(const QString &name) |
16 | { | 16 | { |
17 | Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy", Akonadi2::Storage::ReadWrite); | 17 | Akonadi2::Storage store(Akonadi2::Store::storageLocation(), name, Akonadi2::Storage::ReadWrite); |
18 | store.removeFromDisk(); | 18 | store.removeFromDisk(); |
19 | } | 19 | } |
20 | 20 | ||
@@ -33,6 +33,9 @@ private Q_SLOTS: | |||
33 | 33 | ||
34 | void cleanupTestCase() | 34 | void cleanupTestCase() |
35 | { | 35 | { |
36 | removeFromDisk("org.kde.dummy"); | ||
37 | removeFromDisk("org.kde.dummy.userqueue"); | ||
38 | removeFromDisk("org.kde.dummy.synchronizerqueue"); | ||
36 | } | 39 | } |
37 | 40 | ||
38 | void testProcessCommand() | 41 | void testProcessCommand() |
@@ -60,13 +63,23 @@ private Q_SLOTS: | |||
60 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); | 63 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); |
61 | 64 | ||
62 | const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); | 65 | const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); |
66 | { | ||
67 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command.data()), command.size()); | ||
68 | QVERIFY(Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)); | ||
69 | } | ||
63 | 70 | ||
71 | //Actual test | ||
64 | Akonadi2::Pipeline pipeline("org.kde.dummy"); | 72 | Akonadi2::Pipeline pipeline("org.kde.dummy"); |
73 | QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); | ||
65 | DummyResource resource; | 74 | DummyResource resource; |
66 | resource.configurePipeline(&pipeline); | 75 | resource.configurePipeline(&pipeline); |
67 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); | 76 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); |
68 | //TODO wait until the pipeline has processed the command | 77 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); |
69 | QTest::qWait(1000); | 78 | |
79 | QVERIFY(revisionSpy.isValid()); | ||
80 | QTRY_COMPARE(revisionSpy.count(), 2); | ||
81 | QTest::qWait(100); | ||
82 | QCOMPARE(revisionSpy.count(), 2); | ||
70 | } | 83 | } |
71 | 84 | ||
72 | // void testResourceSync() | 85 | // void testResourceSync() |