diff options
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 234 |
1 files changed, 234 insertions, 0 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp new file mode 100644 index 0000000..0b71500 --- /dev/null +++ b/common/genericresource.cpp | |||
@@ -0,0 +1,234 @@ | |||
1 | #include "genericresource.h" | ||
2 | |||
3 | #include "facade.h" | ||
4 | #include "entitybuffer.h" | ||
5 | #include "pipeline.h" | ||
6 | #include "queuedcommand_generated.h" | ||
7 | #include "createentity_generated.h" | ||
8 | #include "domainadaptor.h" | ||
9 | #include "commands.h" | ||
10 | #include "clientapi.h" | ||
11 | #include "index.h" | ||
12 | #include "log.h" | ||
13 | #include <assert.h> | ||
14 | |||
15 | using namespace Akonadi2; | ||
16 | |||
17 | /** | ||
18 | * Drives the pipeline using the output from all command queues | ||
19 | */ | ||
20 | class Processor : public QObject | ||
21 | { | ||
22 | Q_OBJECT | ||
23 | public: | ||
24 | Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) | ||
25 | : QObject(), | ||
26 | mPipeline(pipeline), | ||
27 | mCommandQueues(commandQueues), | ||
28 | mProcessingLock(false) | ||
29 | { | ||
30 | for (auto queue : mCommandQueues) { | ||
31 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); | ||
32 | Q_UNUSED(ret); | ||
33 | } | ||
34 | } | ||
35 | |||
36 | signals: | ||
37 | void error(int errorCode, const QString &errorMessage); | ||
38 | |||
39 | private: | ||
40 | bool messagesToProcessAvailable() | ||
41 | { | ||
42 | for (auto queue : mCommandQueues) { | ||
43 | if (!queue->isEmpty()) { | ||
44 | return true; | ||
45 | } | ||
46 | } | ||
47 | return false; | ||
48 | } | ||
49 | |||
50 | private slots: | ||
51 | void process() | ||
52 | { | ||
53 | if (mProcessingLock) { | ||
54 | return; | ||
55 | } | ||
56 | mProcessingLock = true; | ||
57 | auto job = processPipeline().then<void>([this]() { | ||
58 | mProcessingLock = false; | ||
59 | if (messagesToProcessAvailable()) { | ||
60 | process(); | ||
61 | } | ||
62 | }).exec(); | ||
63 | } | ||
64 | |||
65 | Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) | ||
66 | { | ||
67 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | ||
68 | //Throw command into appropriate pipeline | ||
69 | switch (queuedCommand->commandId()) { | ||
70 | case Akonadi2::Commands::DeleteEntityCommand: | ||
71 | //mPipeline->removedEntity | ||
72 | return Async::null<void>(); | ||
73 | case Akonadi2::Commands::ModifyEntityCommand: | ||
74 | //mPipeline->modifiedEntity | ||
75 | return Async::null<void>(); | ||
76 | case Akonadi2::Commands::CreateEntityCommand: | ||
77 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | ||
78 | default: | ||
79 | return Async::error<void>(-1, "Unhandled command"); | ||
80 | } | ||
81 | return Async::null<void>(); | ||
82 | } | ||
83 | |||
84 | //Process all messages of this queue | ||
85 | Async::Job<void> processQueue(MessageQueue *queue) | ||
86 | { | ||
87 | //TODO use something like: | ||
88 | //Async::foreach("pass iterator here").each("process value here").join(); | ||
89 | //Async::foreach("pass iterator here").parallel("process value here").join(); | ||
90 | return Async::dowhile( | ||
91 | [this, queue](Async::Future<bool> &future) { | ||
92 | if (queue->isEmpty()) { | ||
93 | future.setValue(false); | ||
94 | future.setFinished(); | ||
95 | return; | ||
96 | } | ||
97 | queue->dequeue( | ||
98 | [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | ||
99 | auto callback = [messageQueueCallback, &future](bool success) { | ||
100 | messageQueueCallback(success); | ||
101 | future.setValue(!success); | ||
102 | future.setFinished(); | ||
103 | }; | ||
104 | |||
105 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | ||
106 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | ||
107 | Warning() << "invalid buffer"; | ||
108 | callback(false); | ||
109 | return; | ||
110 | } | ||
111 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | ||
112 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | ||
113 | //TODO JOBAPI: job lifetime management | ||
114 | //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete | ||
115 | //themselves once done. In other cases we'd like jobs that only live as long as their handle though. | ||
116 | //FIXME this job is stack allocated and thus simply dies.... | ||
117 | processQueuedCommand(queuedCommand).then<void>( | ||
118 | [callback]() { | ||
119 | callback(true); | ||
120 | }, | ||
121 | [callback](int errorCode, QString errorMessage) { | ||
122 | Warning() << "Error while processing queue command: " << errorMessage; | ||
123 | callback(false); | ||
124 | } | ||
125 | ).exec(); | ||
126 | }, | ||
127 | [&future](const MessageQueue::Error &error) { | ||
128 | Warning() << "Error while getting message from messagequeue: " << error.message; | ||
129 | future.setValue(false); | ||
130 | future.setFinished(); | ||
131 | } | ||
132 | ); | ||
133 | } | ||
134 | ); | ||
135 | } | ||
136 | |||
137 | Async::Job<void> processPipeline() | ||
138 | { | ||
139 | //Go through all message queues | ||
140 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | ||
141 | return Async::dowhile( | ||
142 | [it]() { return it->hasNext(); }, | ||
143 | [it, this](Async::Future<void> &future) { | ||
144 | auto queue = it->next(); | ||
145 | processQueue(queue).then<void>([&future]() { | ||
146 | Trace() << "Queue processed"; | ||
147 | future.setFinished(); | ||
148 | }).exec(); | ||
149 | } | ||
150 | ); | ||
151 | } | ||
152 | |||
153 | private: | ||
154 | Akonadi2::Pipeline *mPipeline; | ||
155 | //Ordered by priority | ||
156 | QList<MessageQueue*> mCommandQueues; | ||
157 | bool mProcessingLock; | ||
158 | }; | ||
159 | |||
160 | |||
161 | GenericResource::GenericResource(const QByteArray &resourceIdentifier) | ||
162 | : Akonadi2::Resource(), | ||
163 | mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde." + resourceIdentifier + ".userqueue"), | ||
164 | mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde." + resourceIdentifier + ".synchronizerqueue"), | ||
165 | mError(0) | ||
166 | { | ||
167 | } | ||
168 | |||
169 | GenericResource::~GenericResource() | ||
170 | { | ||
171 | |||
172 | } | ||
173 | |||
174 | void GenericResource::configurePipeline(Akonadi2::Pipeline *pipeline) | ||
175 | { | ||
176 | //TODO figure out lifetime of the processor | ||
177 | mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | ||
178 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | ||
179 | } | ||
180 | |||
181 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) | ||
182 | { | ||
183 | Warning() << "Received error from Processor: " << errorCode << errorMessage; | ||
184 | mError = errorCode; | ||
185 | } | ||
186 | |||
187 | int GenericResource::error() const | ||
188 | { | ||
189 | return mError; | ||
190 | } | ||
191 | |||
192 | void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) | ||
193 | { | ||
194 | //TODO get rid of m_fbb member variable | ||
195 | m_fbb.Clear(); | ||
196 | auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size()); | ||
197 | auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData); | ||
198 | Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); | ||
199 | mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
200 | } | ||
201 | |||
202 | void GenericResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) | ||
203 | { | ||
204 | //TODO instead of copying the command including the full entity first into the command queue, we could directly | ||
205 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). | ||
206 | //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). | ||
207 | enqueueCommand(mUserQueue, commandId, data); | ||
208 | } | ||
209 | |||
210 | Async::Job<void> GenericResource::processAllMessages() | ||
211 | { | ||
212 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | ||
213 | //TODO: report errors while processing sync? | ||
214 | //TODO JOBAPI: A helper that waits for n events and then continues? | ||
215 | return Async::start<void>([this](Async::Future<void> &f) { | ||
216 | if (mSynchronizerQueue.isEmpty()) { | ||
217 | f.setFinished(); | ||
218 | } else { | ||
219 | QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { | ||
220 | f.setFinished(); | ||
221 | }); | ||
222 | } | ||
223 | }).then<void>([this](Async::Future<void> &f) { | ||
224 | if (mUserQueue.isEmpty()) { | ||
225 | f.setFinished(); | ||
226 | } else { | ||
227 | QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() { | ||
228 | f.setFinished(); | ||
229 | }); | ||
230 | } | ||
231 | }); | ||
232 | } | ||
233 | |||
234 | #include "genericresource.moc" | ||