diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-20 22:22:19 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-20 22:22:19 +0200 |
commit | 47f105febcd17d6db1f998a99c6c6c423851573a (patch) | |
tree | 1f02b0e09444c55bd509984233b918b5a1937357 | |
parent | b4db894f76de9ac252081972143efcd3fcd66533 (diff) | |
download | sink-47f105febcd17d6db1f998a99c6c6c423851573a.tar.gz sink-47f105febcd17d6db1f998a99c6c6c423851573a.zip |
Moved generic part of resource implementation to GenericResource
-rw-r--r-- | common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | common/genericresource.cpp | 234 | ||||
-rw-r--r-- | common/genericresource.h | 59 | ||||
-rw-r--r-- | examples/dummyresource/resourcefactory.cpp | 200 | ||||
-rw-r--r-- | examples/dummyresource/resourcefactory.h | 22 |
5 files changed, 301 insertions, 215 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index ce237c5..c18d98a 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -33,6 +33,7 @@ set(command_SRCS | |||
33 | pipeline.cpp | 33 | pipeline.cpp |
34 | domainadaptor.cpp | 34 | domainadaptor.cpp |
35 | resource.cpp | 35 | resource.cpp |
36 | genericresource.cpp | ||
36 | resourceaccess.cpp | 37 | resourceaccess.cpp |
37 | storage_common.cpp | 38 | storage_common.cpp |
38 | threadboundary.cpp | 39 | threadboundary.cpp |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp new file mode 100644 index 0000000..0b71500 --- /dev/null +++ b/common/genericresource.cpp | |||
@@ -0,0 +1,234 @@ | |||
1 | #include "genericresource.h" | ||
2 | |||
3 | #include "facade.h" | ||
4 | #include "entitybuffer.h" | ||
5 | #include "pipeline.h" | ||
6 | #include "queuedcommand_generated.h" | ||
7 | #include "createentity_generated.h" | ||
8 | #include "domainadaptor.h" | ||
9 | #include "commands.h" | ||
10 | #include "clientapi.h" | ||
11 | #include "index.h" | ||
12 | #include "log.h" | ||
13 | #include <assert.h> | ||
14 | |||
15 | using namespace Akonadi2; | ||
16 | |||
17 | /** | ||
18 | * Drives the pipeline using the output from all command queues | ||
19 | */ | ||
20 | class Processor : public QObject | ||
21 | { | ||
22 | Q_OBJECT | ||
23 | public: | ||
24 | Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) | ||
25 | : QObject(), | ||
26 | mPipeline(pipeline), | ||
27 | mCommandQueues(commandQueues), | ||
28 | mProcessingLock(false) | ||
29 | { | ||
30 | for (auto queue : mCommandQueues) { | ||
31 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); | ||
32 | Q_UNUSED(ret); | ||
33 | } | ||
34 | } | ||
35 | |||
36 | signals: | ||
37 | void error(int errorCode, const QString &errorMessage); | ||
38 | |||
39 | private: | ||
40 | bool messagesToProcessAvailable() | ||
41 | { | ||
42 | for (auto queue : mCommandQueues) { | ||
43 | if (!queue->isEmpty()) { | ||
44 | return true; | ||
45 | } | ||
46 | } | ||
47 | return false; | ||
48 | } | ||
49 | |||
50 | private slots: | ||
51 | void process() | ||
52 | { | ||
53 | if (mProcessingLock) { | ||
54 | return; | ||
55 | } | ||
56 | mProcessingLock = true; | ||
57 | auto job = processPipeline().then<void>([this]() { | ||
58 | mProcessingLock = false; | ||
59 | if (messagesToProcessAvailable()) { | ||
60 | process(); | ||
61 | } | ||
62 | }).exec(); | ||
63 | } | ||
64 | |||
65 | Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) | ||
66 | { | ||
67 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | ||
68 | //Throw command into appropriate pipeline | ||
69 | switch (queuedCommand->commandId()) { | ||
70 | case Akonadi2::Commands::DeleteEntityCommand: | ||
71 | //mPipeline->removedEntity | ||
72 | return Async::null<void>(); | ||
73 | case Akonadi2::Commands::ModifyEntityCommand: | ||
74 | //mPipeline->modifiedEntity | ||
75 | return Async::null<void>(); | ||
76 | case Akonadi2::Commands::CreateEntityCommand: | ||
77 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | ||
78 | default: | ||
79 | return Async::error<void>(-1, "Unhandled command"); | ||
80 | } | ||
81 | return Async::null<void>(); | ||
82 | } | ||
83 | |||
84 | //Process all messages of this queue | ||
85 | Async::Job<void> processQueue(MessageQueue *queue) | ||
86 | { | ||
87 | //TODO use something like: | ||
88 | //Async::foreach("pass iterator here").each("process value here").join(); | ||
89 | //Async::foreach("pass iterator here").parallel("process value here").join(); | ||
90 | return Async::dowhile( | ||
91 | [this, queue](Async::Future<bool> &future) { | ||
92 | if (queue->isEmpty()) { | ||
93 | future.setValue(false); | ||
94 | future.setFinished(); | ||
95 | return; | ||
96 | } | ||
97 | queue->dequeue( | ||
98 | [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | ||
99 | auto callback = [messageQueueCallback, &future](bool success) { | ||
100 | messageQueueCallback(success); | ||
101 | future.setValue(!success); | ||
102 | future.setFinished(); | ||
103 | }; | ||
104 | |||
105 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | ||
106 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | ||
107 | Warning() << "invalid buffer"; | ||
108 | callback(false); | ||
109 | return; | ||
110 | } | ||
111 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | ||
112 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | ||
113 | //TODO JOBAPI: job lifetime management | ||
114 | //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete | ||
115 | //themselves once done. In other cases we'd like jobs that only live as long as their handle though. | ||
116 | //FIXME this job is stack allocated and thus simply dies.... | ||
117 | processQueuedCommand(queuedCommand).then<void>( | ||
118 | [callback]() { | ||
119 | callback(true); | ||
120 | }, | ||
121 | [callback](int errorCode, QString errorMessage) { | ||
122 | Warning() << "Error while processing queue command: " << errorMessage; | ||
123 | callback(false); | ||
124 | } | ||
125 | ).exec(); | ||
126 | }, | ||
127 | [&future](const MessageQueue::Error &error) { | ||
128 | Warning() << "Error while getting message from messagequeue: " << error.message; | ||
129 | future.setValue(false); | ||
130 | future.setFinished(); | ||
131 | } | ||
132 | ); | ||
133 | } | ||
134 | ); | ||
135 | } | ||
136 | |||
137 | Async::Job<void> processPipeline() | ||
138 | { | ||
139 | //Go through all message queues | ||
140 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | ||
141 | return Async::dowhile( | ||
142 | [it]() { return it->hasNext(); }, | ||
143 | [it, this](Async::Future<void> &future) { | ||
144 | auto queue = it->next(); | ||
145 | processQueue(queue).then<void>([&future]() { | ||
146 | Trace() << "Queue processed"; | ||
147 | future.setFinished(); | ||
148 | }).exec(); | ||
149 | } | ||
150 | ); | ||
151 | } | ||
152 | |||
153 | private: | ||
154 | Akonadi2::Pipeline *mPipeline; | ||
155 | //Ordered by priority | ||
156 | QList<MessageQueue*> mCommandQueues; | ||
157 | bool mProcessingLock; | ||
158 | }; | ||
159 | |||
160 | |||
161 | GenericResource::GenericResource(const QByteArray &resourceIdentifier) | ||
162 | : Akonadi2::Resource(), | ||
163 | mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde." + resourceIdentifier + ".userqueue"), | ||
164 | mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde." + resourceIdentifier + ".synchronizerqueue"), | ||
165 | mError(0) | ||
166 | { | ||
167 | } | ||
168 | |||
169 | GenericResource::~GenericResource() | ||
170 | { | ||
171 | |||
172 | } | ||
173 | |||
174 | void GenericResource::configurePipeline(Akonadi2::Pipeline *pipeline) | ||
175 | { | ||
176 | //TODO figure out lifetime of the processor | ||
177 | mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | ||
178 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | ||
179 | } | ||
180 | |||
181 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) | ||
182 | { | ||
183 | Warning() << "Received error from Processor: " << errorCode << errorMessage; | ||
184 | mError = errorCode; | ||
185 | } | ||
186 | |||
187 | int GenericResource::error() const | ||
188 | { | ||
189 | return mError; | ||
190 | } | ||
191 | |||
192 | void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) | ||
193 | { | ||
194 | //TODO get rid of m_fbb member variable | ||
195 | m_fbb.Clear(); | ||
196 | auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size()); | ||
197 | auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData); | ||
198 | Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); | ||
199 | mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
200 | } | ||
201 | |||
202 | void GenericResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) | ||
203 | { | ||
204 | //TODO instead of copying the command including the full entity first into the command queue, we could directly | ||
205 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). | ||
206 | //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). | ||
207 | enqueueCommand(mUserQueue, commandId, data); | ||
208 | } | ||
209 | |||
210 | Async::Job<void> GenericResource::processAllMessages() | ||
211 | { | ||
212 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | ||
213 | //TODO: report errors while processing sync? | ||
214 | //TODO JOBAPI: A helper that waits for n events and then continues? | ||
215 | return Async::start<void>([this](Async::Future<void> &f) { | ||
216 | if (mSynchronizerQueue.isEmpty()) { | ||
217 | f.setFinished(); | ||
218 | } else { | ||
219 | QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { | ||
220 | f.setFinished(); | ||
221 | }); | ||
222 | } | ||
223 | }).then<void>([this](Async::Future<void> &f) { | ||
224 | if (mUserQueue.isEmpty()) { | ||
225 | f.setFinished(); | ||
226 | } else { | ||
227 | QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() { | ||
228 | f.setFinished(); | ||
229 | }); | ||
230 | } | ||
231 | }); | ||
232 | } | ||
233 | |||
234 | #include "genericresource.moc" | ||
diff --git a/common/genericresource.h b/common/genericresource.h new file mode 100644 index 0000000..36fa567 --- /dev/null +++ b/common/genericresource.h | |||
@@ -0,0 +1,59 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #pragma once | ||
21 | |||
22 | #include <akonadi2common_export.h> | ||
23 | #include <resource.h> | ||
24 | #include <messagequeue.h> | ||
25 | |||
26 | class Processor; | ||
27 | |||
28 | namespace Akonadi2 | ||
29 | { | ||
30 | |||
31 | /** | ||
32 | * Generic Resource implementation. | ||
33 | */ | ||
34 | class AKONADI2COMMON_EXPORT GenericResource : public Resource | ||
35 | { | ||
36 | public: | ||
37 | GenericResource(const QByteArray &resourceIdentifier); | ||
38 | virtual ~GenericResource(); | ||
39 | |||
40 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; | ||
41 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; | ||
42 | virtual Async::Job<void> processAllMessages() Q_DECL_OVERRIDE; | ||
43 | |||
44 | virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE; | ||
45 | int error() const; | ||
46 | |||
47 | protected: | ||
48 | void onProcessorError(int errorCode, const QString &errorMessage); | ||
49 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | ||
50 | flatbuffers::FlatBufferBuilder m_fbb; | ||
51 | MessageQueue mUserQueue; | ||
52 | MessageQueue mSynchronizerQueue; | ||
53 | |||
54 | private: | ||
55 | Processor *mProcessor; | ||
56 | int mError; | ||
57 | }; | ||
58 | |||
59 | } | ||
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index d5765e2..a4cd68d 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp | |||
@@ -103,152 +103,10 @@ QMap<QString, QString> populate() | |||
103 | 103 | ||
104 | static QMap<QString, QString> s_dataSource = populate(); | 104 | static QMap<QString, QString> s_dataSource = populate(); |
105 | 105 | ||
106 | //Drives the pipeline using the output from all command queues | ||
107 | class Processor : public QObject | ||
108 | { | ||
109 | Q_OBJECT | ||
110 | public: | ||
111 | Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) | ||
112 | : QObject(), | ||
113 | mPipeline(pipeline), | ||
114 | mCommandQueues(commandQueues), | ||
115 | mProcessingLock(false) | ||
116 | { | ||
117 | for (auto queue : mCommandQueues) { | ||
118 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); | ||
119 | Q_UNUSED(ret); | ||
120 | } | ||
121 | } | ||
122 | |||
123 | signals: | ||
124 | void error(int errorCode, const QString &errorMessage); | ||
125 | |||
126 | private: | ||
127 | bool messagesToProcessAvailable() | ||
128 | { | ||
129 | for (auto queue : mCommandQueues) { | ||
130 | if (!queue->isEmpty()) { | ||
131 | return true; | ||
132 | } | ||
133 | } | ||
134 | return false; | ||
135 | } | ||
136 | |||
137 | private slots: | ||
138 | void process() | ||
139 | { | ||
140 | if (mProcessingLock) { | ||
141 | return; | ||
142 | } | ||
143 | mProcessingLock = true; | ||
144 | auto job = processPipeline().then<void>([this]() { | ||
145 | mProcessingLock = false; | ||
146 | if (messagesToProcessAvailable()) { | ||
147 | process(); | ||
148 | } | ||
149 | }).exec(); | ||
150 | } | ||
151 | |||
152 | Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) | ||
153 | { | ||
154 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | ||
155 | //Throw command into appropriate pipeline | ||
156 | switch (queuedCommand->commandId()) { | ||
157 | case Akonadi2::Commands::DeleteEntityCommand: | ||
158 | //mPipeline->removedEntity | ||
159 | return Async::null<void>(); | ||
160 | case Akonadi2::Commands::ModifyEntityCommand: | ||
161 | //mPipeline->modifiedEntity | ||
162 | return Async::null<void>(); | ||
163 | case Akonadi2::Commands::CreateEntityCommand: | ||
164 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | ||
165 | default: | ||
166 | return Async::error<void>(-1, "Unhandled command"); | ||
167 | } | ||
168 | return Async::null<void>(); | ||
169 | } | ||
170 | |||
171 | //Process all messages of this queue | ||
172 | Async::Job<void> processQueue(MessageQueue *queue) | ||
173 | { | ||
174 | //TODO use something like: | ||
175 | //Async::foreach("pass iterator here").each("process value here").join(); | ||
176 | //Async::foreach("pass iterator here").parallel("process value here").join(); | ||
177 | return Async::dowhile( | ||
178 | [this, queue](Async::Future<bool> &future) { | ||
179 | if (queue->isEmpty()) { | ||
180 | future.setValue(false); | ||
181 | future.setFinished(); | ||
182 | return; | ||
183 | } | ||
184 | queue->dequeue( | ||
185 | [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | ||
186 | auto callback = [messageQueueCallback, &future](bool success) { | ||
187 | messageQueueCallback(success); | ||
188 | future.setValue(!success); | ||
189 | future.setFinished(); | ||
190 | }; | ||
191 | |||
192 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | ||
193 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | ||
194 | Warning() << "invalid buffer"; | ||
195 | callback(false); | ||
196 | return; | ||
197 | } | ||
198 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | ||
199 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | ||
200 | //TODO JOBAPI: job lifetime management | ||
201 | //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete | ||
202 | //themselves once done. In other cases we'd like jobs that only live as long as their handle though. | ||
203 | //FIXME this job is stack allocated and thus simply dies.... | ||
204 | processQueuedCommand(queuedCommand).then<void>( | ||
205 | [callback]() { | ||
206 | callback(true); | ||
207 | }, | ||
208 | [callback](int errorCode, QString errorMessage) { | ||
209 | Warning() << "Error while processing queue command: " << errorMessage; | ||
210 | callback(false); | ||
211 | } | ||
212 | ).exec(); | ||
213 | }, | ||
214 | [&future](const MessageQueue::Error &error) { | ||
215 | Warning() << "Error while getting message from messagequeue: " << error.message; | ||
216 | future.setValue(false); | ||
217 | future.setFinished(); | ||
218 | } | ||
219 | ); | ||
220 | } | ||
221 | ); | ||
222 | } | ||
223 | |||
224 | Async::Job<void> processPipeline() | ||
225 | { | ||
226 | //Go through all message queues | ||
227 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | ||
228 | return Async::dowhile( | ||
229 | [it]() { return it->hasNext(); }, | ||
230 | [it, this](Async::Future<void> &future) { | ||
231 | auto queue = it->next(); | ||
232 | processQueue(queue).then<void>([&future]() { | ||
233 | Trace() << "Queue processed"; | ||
234 | future.setFinished(); | ||
235 | }).exec(); | ||
236 | } | ||
237 | ); | ||
238 | } | ||
239 | |||
240 | private: | ||
241 | Akonadi2::Pipeline *mPipeline; | ||
242 | //Ordered by priority | ||
243 | QList<MessageQueue*> mCommandQueues; | ||
244 | bool mProcessingLock; | ||
245 | }; | ||
246 | 106 | ||
107 | //FIXME We need to pass the resource-instance name to generic resource, not the plugin name | ||
247 | DummyResource::DummyResource() | 108 | DummyResource::DummyResource() |
248 | : Akonadi2::Resource(), | 109 | : Akonadi2::GenericResource(PLUGIN_NAME) |
249 | mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), | ||
250 | mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"), | ||
251 | mError(0) | ||
252 | { | 110 | { |
253 | } | 111 | } |
254 | 112 | ||
@@ -277,19 +135,7 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) | |||
277 | 135 | ||
278 | //event is the entitytype and not the domain type | 136 | //event is the entitytype and not the domain type |
279 | pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer << uidIndexer); | 137 | pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer << uidIndexer); |
280 | mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | 138 | GenericResource::configurePipeline(pipeline); |
281 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | ||
282 | } | ||
283 | |||
284 | void DummyResource::onProcessorError(int errorCode, const QString &errorMessage) | ||
285 | { | ||
286 | Warning() << "Received error from Processor: " << errorCode << errorMessage; | ||
287 | mError = errorCode; | ||
288 | } | ||
289 | |||
290 | int DummyResource::error() const | ||
291 | { | ||
292 | return mError; | ||
293 | } | 139 | } |
294 | 140 | ||
295 | void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) | 141 | void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) |
@@ -316,15 +162,6 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri | |||
316 | }); | 162 | }); |
317 | } | 163 | } |
318 | 164 | ||
319 | void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) | ||
320 | { | ||
321 | m_fbb.Clear(); | ||
322 | auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size()); | ||
323 | auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData); | ||
324 | Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); | ||
325 | mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
326 | } | ||
327 | |||
328 | Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) | 165 | Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) |
329 | { | 166 | { |
330 | return Async::start<void>([this, pipeline](Async::Future<void> &f) { | 167 | return Async::start<void>([this, pipeline](Async::Future<void> &f) { |
@@ -377,37 +214,6 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli | |||
377 | }); | 214 | }); |
378 | } | 215 | } |
379 | 216 | ||
380 | Async::Job<void> DummyResource::processAllMessages() | ||
381 | { | ||
382 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | ||
383 | //TODO: report errors while processing sync? | ||
384 | //TODO JOBAPI: A helper that waits for n events and then continues? | ||
385 | return Async::start<void>([this](Async::Future<void> &f) { | ||
386 | if (mSynchronizerQueue.isEmpty()) { | ||
387 | f.setFinished(); | ||
388 | } else { | ||
389 | QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { | ||
390 | f.setFinished(); | ||
391 | }); | ||
392 | } | ||
393 | }).then<void>([this](Async::Future<void> &f) { | ||
394 | if (mUserQueue.isEmpty()) { | ||
395 | f.setFinished(); | ||
396 | } else { | ||
397 | QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() { | ||
398 | f.setFinished(); | ||
399 | }); | ||
400 | } | ||
401 | }); | ||
402 | } | ||
403 | |||
404 | void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) | ||
405 | { | ||
406 | //TODO instead of copying the command including the full entity first into the command queue, we could directly | ||
407 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). | ||
408 | //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). | ||
409 | enqueueCommand(mUserQueue, commandId, data); | ||
410 | } | ||
411 | 217 | ||
412 | DummyResourceFactory::DummyResourceFactory(QObject *parent) | 218 | DummyResourceFactory::DummyResourceFactory(QObject *parent) |
413 | : Akonadi2::ResourceFactory(parent) | 219 | : Akonadi2::ResourceFactory(parent) |
diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h index 3b99d5e..7b7783e 100644 --- a/examples/dummyresource/resourcefactory.h +++ b/examples/dummyresource/resourcefactory.h | |||
@@ -19,7 +19,7 @@ | |||
19 | 19 | ||
20 | #pragma once | 20 | #pragma once |
21 | 21 | ||
22 | #include "common/resource.h" | 22 | #include "common/genericresource.h" |
23 | #include "async/src/async.h" | 23 | #include "async/src/async.h" |
24 | #include "common/messagequeue.h" | 24 | #include "common/messagequeue.h" |
25 | 25 | ||
@@ -28,26 +28,12 @@ | |||
28 | //TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA | 28 | //TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA |
29 | #define PLUGIN_NAME "org.kde.dummy" | 29 | #define PLUGIN_NAME "org.kde.dummy" |
30 | 30 | ||
31 | class Processor; | 31 | class DummyResource : public Akonadi2::GenericResource |
32 | |||
33 | class DummyResource : public Akonadi2::Resource | ||
34 | { | 32 | { |
35 | public: | 33 | public: |
36 | DummyResource(); | 34 | DummyResource(); |
37 | Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline); | 35 | Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline) Q_DECL_OVERRIDE; |
38 | Async::Job<void> processAllMessages(); | 36 | void configurePipeline(Akonadi2::Pipeline *pipeline) Q_DECL_OVERRIDE; |
39 | void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); | ||
40 | void configurePipeline(Akonadi2::Pipeline *pipeline); | ||
41 | int error() const; | ||
42 | |||
43 | private: | ||
44 | void onProcessorError(int errorCode, const QString &errorMessage); | ||
45 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | ||
46 | flatbuffers::FlatBufferBuilder m_fbb; | ||
47 | MessageQueue mUserQueue; | ||
48 | MessageQueue mSynchronizerQueue; | ||
49 | Processor *mProcessor; | ||
50 | int mError; | ||
51 | }; | 37 | }; |
52 | 38 | ||
53 | class DummyResourceFactory : public Akonadi2::ResourceFactory | 39 | class DummyResourceFactory : public Akonadi2::ResourceFactory |