diff options
Diffstat (limited to 'dummyresource')
-rw-r--r-- | dummyresource/domainadaptor.cpp | 5 | ||||
-rw-r--r-- | dummyresource/facade.cpp | 24 | ||||
-rw-r--r-- | dummyresource/resourcefactory.cpp | 123 | ||||
-rw-r--r-- | dummyresource/resourcefactory.h | 6 |
4 files changed, 142 insertions, 16 deletions
diff --git a/dummyresource/domainadaptor.cpp b/dummyresource/domainadaptor.cpp index ae001cf..9bd3770 100644 --- a/dummyresource/domainadaptor.cpp +++ b/dummyresource/domainadaptor.cpp | |||
@@ -29,8 +29,8 @@ public: | |||
29 | 29 | ||
30 | void setProperty(const QString &key, const QVariant &value) | 30 | void setProperty(const QString &key, const QVariant &value) |
31 | { | 31 | { |
32 | if (mResourceMapper->mWriteAccessors.contains(key)) { | 32 | if (mResourceMapper && mResourceMapper->mWriteAccessors.contains(key)) { |
33 | // mResourceMapper.setProperty(key, value, mResourceBuffer); | 33 | // mResourceMapper->setProperty(key, value, mResourceBuffer); |
34 | } else { | 34 | } else { |
35 | // mLocalMapper.; | 35 | // mLocalMapper.; |
36 | } | 36 | } |
@@ -69,6 +69,7 @@ DummyEventAdaptorFactory::DummyEventAdaptorFactory() | |||
69 | mResourceMapper->mReadAccessors.insert("summary", [](DummyEvent const *buffer) -> QVariant { | 69 | mResourceMapper->mReadAccessors.insert("summary", [](DummyEvent const *buffer) -> QVariant { |
70 | return QString::fromStdString(buffer->summary()->c_str()); | 70 | return QString::fromStdString(buffer->summary()->c_str()); |
71 | }); | 71 | }); |
72 | mLocalMapper = QSharedPointer<PropertyMapper<Akonadi2::Domain::Buffer::Event> >::create(); | ||
72 | //TODO set accessors for all properties | 73 | //TODO set accessors for all properties |
73 | 74 | ||
74 | } | 75 | } |
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index f754c7e..668fbbf 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp | |||
@@ -28,6 +28,7 @@ | |||
28 | #include "event_generated.h" | 28 | #include "event_generated.h" |
29 | #include "entity_generated.h" | 29 | #include "entity_generated.h" |
30 | #include "metadata_generated.h" | 30 | #include "metadata_generated.h" |
31 | #include "createentity_generated.h" | ||
31 | #include "domainadaptor.h" | 32 | #include "domainadaptor.h" |
32 | #include <common/entitybuffer.h> | 33 | #include <common/entitybuffer.h> |
33 | 34 | ||
@@ -48,6 +49,29 @@ DummyResourceFacade::~DummyResourceFacade() | |||
48 | void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject) | 49 | void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject) |
49 | { | 50 | { |
50 | //Create message buffer and send to resource | 51 | //Create message buffer and send to resource |
52 | flatbuffers::FlatBufferBuilder eventFbb; | ||
53 | eventFbb.Clear(); | ||
54 | { | ||
55 | auto summary = eventFbb.CreateString("summary"); | ||
56 | // auto data = fbb.CreateUninitializedVector<uint8_t>(attachmentSize); | ||
57 | DummyCalendar::DummyEventBuilder eventBuilder(eventFbb); | ||
58 | eventBuilder.add_summary(summary); | ||
59 | auto eventLocation = eventBuilder.Finish(); | ||
60 | DummyCalendar::FinishDummyEventBuffer(eventFbb, eventLocation); | ||
61 | // memcpy((void*)DummyCalendar::GetDummyEvent(fbb.GetBufferPointer())->attachment()->Data(), rawData, attachmentSize); | ||
62 | } | ||
63 | flatbuffers::FlatBufferBuilder entityFbb; | ||
64 | Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), 0, 0); | ||
65 | |||
66 | flatbuffers::FlatBufferBuilder fbb; | ||
67 | auto type = fbb.CreateString(Akonadi2::Domain::getTypeName<Akonadi2::Domain::Event>().toStdString().data()); | ||
68 | auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
69 | Akonadi2::Commands::CreateEntityBuilder builder(fbb); | ||
70 | builder.add_domainType(type); | ||
71 | builder.add_delta(delta); | ||
72 | auto location = builder.Finish(); | ||
73 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); | ||
74 | mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); | ||
51 | } | 75 | } |
52 | 76 | ||
53 | void DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject) | 77 | void DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject) |
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index e4f7e41..e14aa01 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -23,18 +23,20 @@ | |||
23 | #include "pipeline.h" | 23 | #include "pipeline.h" |
24 | #include "dummycalendar_generated.h" | 24 | #include "dummycalendar_generated.h" |
25 | #include "metadata_generated.h" | 25 | #include "metadata_generated.h" |
26 | #include "queuedcommand_generated.h" | ||
26 | #include "domainadaptor.h" | 27 | #include "domainadaptor.h" |
28 | #include "commands.h" | ||
29 | #include "clientapi.h" | ||
27 | #include <QUuid> | 30 | #include <QUuid> |
28 | 31 | ||
29 | /* | 32 | /* |
30 | * Figure out how to implement various classes of processors: | 33 | * Figure out how to implement various classes of processors: |
31 | * * read-only (index and such) => domain adapter | 34 | * * read-only (index and such) => extractor function, probably using domain adaptor |
32 | * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) | 35 | * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) |
33 | * * flag extractors? => like read-only? Or write to local portion of buffer? | 36 | * * flag extractors? => like read-only? Or write to local portion of buffer? |
34 | * ** $ISSPAM should become part of domain object and is written to the local part of the mail. | 37 | * ** $ISSPAM should become part of domain object and is written to the local part of the mail. |
35 | * ** => value could be calculated by the server directly | 38 | * ** => value could be calculated by the server directly |
36 | */ | 39 | */ |
37 | // template <typename DomainType> | ||
38 | class SimpleProcessor : public Akonadi2::Preprocessor | 40 | class SimpleProcessor : public Akonadi2::Preprocessor |
39 | { | 41 | { |
40 | public: | 42 | public: |
@@ -96,21 +98,108 @@ QMap<QString, QString> populate() | |||
96 | 98 | ||
97 | static QMap<QString, QString> s_dataSource = populate(); | 99 | static QMap<QString, QString> s_dataSource = populate(); |
98 | 100 | ||
101 | //Drives the pipeline using the output from all command queues | ||
102 | class Processor : public QObject | ||
103 | { | ||
104 | Q_OBJECT | ||
105 | public: | ||
106 | Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) | ||
107 | : QObject(), | ||
108 | mPipeline(pipeline), | ||
109 | mCommandQueues(commandQueues), | ||
110 | mProcessingLock(false) | ||
111 | { | ||
112 | for (auto queue : mCommandQueues) { | ||
113 | bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); | ||
114 | Q_ASSERT(ret); | ||
115 | } | ||
116 | } | ||
117 | |||
118 | private slots: | ||
119 | void process() | ||
120 | { | ||
121 | if (mProcessingLock) { | ||
122 | return; | ||
123 | } | ||
124 | mProcessingLock = true; | ||
125 | //Empty queue after queue | ||
126 | //FIXME the for and while loops should be async, otherwise we process all messages in parallel | ||
127 | for (auto queue : mCommandQueues) { | ||
128 | qDebug() << "processing queue"; | ||
129 | bool processedMessage = false; | ||
130 | while (processedMessage) { | ||
131 | qDebug() << "process"; | ||
132 | processedMessage = false; | ||
133 | queue->dequeue([this, &processedMessage](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | ||
134 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | ||
135 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | ||
136 | qWarning() << "invalid buffer"; | ||
137 | processedMessage = false; | ||
138 | messageQueueCallback(false); | ||
139 | return; | ||
140 | } | ||
141 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | ||
142 | qDebug() << "Dequeued: " << queuedCommand->commandId(); | ||
143 | //Throw command into appropriate pipeline | ||
144 | switch (queuedCommand->commandId()) { | ||
145 | case Akonadi2::Commands::DeleteEntityCommand: | ||
146 | //mPipeline->removedEntity | ||
147 | break; | ||
148 | case Akonadi2::Commands::ModifyEntityCommand: | ||
149 | //mPipeline->modifiedEntity | ||
150 | break; | ||
151 | case Akonadi2::Commands::CreateEntityCommand: { | ||
152 | //TODO job lifetime management | ||
153 | auto job = mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([&messageQueueCallback](Async::Future<void> future) { | ||
154 | messageQueueCallback(true); | ||
155 | }); | ||
156 | job.exec(); | ||
157 | } | ||
158 | break; | ||
159 | default: | ||
160 | //Unhandled command | ||
161 | qWarning() << "Unhandled command"; | ||
162 | messageQueueCallback(true); | ||
163 | break; | ||
164 | } | ||
165 | processedMessage = true; | ||
166 | }, | ||
167 | [&processedMessage](const MessageQueue::Error &error) { | ||
168 | processedMessage = false; | ||
169 | }); | ||
170 | } | ||
171 | } | ||
172 | mProcessingLock = false; | ||
173 | } | ||
174 | |||
175 | private: | ||
176 | Akonadi2::Pipeline *mPipeline; | ||
177 | //Ordered by priority | ||
178 | QList<MessageQueue*> mCommandQueues; | ||
179 | bool mProcessingLock; | ||
180 | }; | ||
181 | |||
99 | DummyResource::DummyResource() | 182 | DummyResource::DummyResource() |
100 | : Akonadi2::Resource() | 183 | : Akonadi2::Resource(), |
184 | mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), | ||
185 | mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue") | ||
101 | { | 186 | { |
102 | } | 187 | } |
103 | 188 | ||
104 | void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) | 189 | void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) |
105 | { | 190 | { |
106 | auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create(); | 191 | auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create(); |
192 | //FIXME we should setup for each resource entity type, not for each domain type | ||
193 | //i.e. If a resource stores tags as part of each message it needs to update the tag index | ||
107 | //TODO setup preprocessors for each domain type and pipeline type allowing full customization | 194 | //TODO setup preprocessors for each domain type and pipeline type allowing full customization |
108 | //Eventually the order should be self configuring, for now it's hardcoded. | 195 | //Eventually the order should be self configuring, for now it's hardcoded. |
109 | auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { | 196 | auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { |
110 | auto adaptor = eventFactory->createAdaptor(entity); | 197 | auto adaptor = eventFactory->createAdaptor(entity); |
111 | qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); | 198 | qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); |
112 | }); | 199 | }); |
113 | pipeline->setPreprocessors<Akonadi2::Domain::Event>(Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); | 200 | //event is the entitytype and not the domain type |
201 | pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); | ||
202 | mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | ||
114 | } | 203 | } |
115 | 204 | ||
116 | void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) | 205 | void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) |
@@ -139,6 +228,7 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri | |||
139 | 228 | ||
140 | Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) | 229 | Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) |
141 | { | 230 | { |
231 | qDebug() << "synchronizeWithSource"; | ||
142 | return Async::start<void>([this, pipeline](Async::Future<void> &f) { | 232 | return Async::start<void>([this, pipeline](Async::Future<void> &f) { |
143 | //TODO use a read-only transaction during the complete sync to sync against a defined revision | 233 | //TODO use a read-only transaction during the complete sync to sync against a defined revision |
144 | auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); | 234 | auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); |
@@ -171,7 +261,9 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli | |||
171 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); | 261 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); |
172 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | 262 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. |
173 | const auto key = QUuid::createUuid().toString().toUtf8(); | 263 | const auto key = QUuid::createUuid().toString().toUtf8(); |
174 | pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); | 264 | //TODO Create queuedcommand and push into synchronizerQueue |
265 | //* create message in store directly, add command to messagequeue waiting for processing. | ||
266 | // pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
175 | } else { //modification | 267 | } else { //modification |
176 | //TODO diff and create modification if necessary | 268 | //TODO diff and create modification if necessary |
177 | } | 269 | } |
@@ -183,16 +275,18 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli | |||
183 | 275 | ||
184 | void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) | 276 | void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) |
185 | { | 277 | { |
186 | Q_UNUSED(commandId) | 278 | qDebug() << "processCommand"; |
187 | Q_UNUSED(data) | 279 | //TODO instead of copying the command including the full entity first into the command queue, we could directly |
188 | Q_UNUSED(size) | 280 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). |
189 | //TODO reallly process the commands :) | 281 | //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). |
190 | auto builder = DummyCalendar::DummyEventBuilder(m_fbb); | ||
191 | builder .add_summary(m_fbb.CreateString("summary summary!")); | ||
192 | auto buffer = builder.Finish(); | ||
193 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); | ||
194 | pipeline->newEntity<Akonadi2::Domain::Event>("fakekey", m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
195 | m_fbb.Clear(); | 282 | m_fbb.Clear(); |
283 | auto commandData = m_fbb.CreateVector(reinterpret_cast<uint8_t const *>(data.data()), data.size()); | ||
284 | auto builder = Akonadi2::QueuedCommandBuilder(m_fbb); | ||
285 | builder.add_commandId(commandId); | ||
286 | builder.add_command(commandData); | ||
287 | auto buffer = builder.Finish(); | ||
288 | Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); | ||
289 | mUserQueue.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
196 | } | 290 | } |
197 | 291 | ||
198 | DummyResourceFactory::DummyResourceFactory(QObject *parent) | 292 | DummyResourceFactory::DummyResourceFactory(QObject *parent) |
@@ -211,3 +305,4 @@ void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory) | |||
211 | factory.registerFacade<Akonadi2::Domain::Event, DummyResourceFacade>(PLUGIN_NAME); | 305 | factory.registerFacade<Akonadi2::Domain::Event, DummyResourceFacade>(PLUGIN_NAME); |
212 | } | 306 | } |
213 | 307 | ||
308 | #include "resourcefactory.moc" | ||
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 427fcc6..682f25c 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h | |||
@@ -21,12 +21,15 @@ | |||
21 | 21 | ||
22 | #include "common/resource.h" | 22 | #include "common/resource.h" |
23 | #include "async/src/async.h" | 23 | #include "async/src/async.h" |
24 | #include "common/messagequeue.h" | ||
24 | 25 | ||
25 | #include <flatbuffers/flatbuffers.h> | 26 | #include <flatbuffers/flatbuffers.h> |
26 | 27 | ||
27 | //TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA | 28 | //TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA |
28 | #define PLUGIN_NAME "org.kde.dummy" | 29 | #define PLUGIN_NAME "org.kde.dummy" |
29 | 30 | ||
31 | class Processor; | ||
32 | |||
30 | class DummyResource : public Akonadi2::Resource | 33 | class DummyResource : public Akonadi2::Resource |
31 | { | 34 | { |
32 | public: | 35 | public: |
@@ -37,6 +40,9 @@ public: | |||
37 | 40 | ||
38 | private: | 41 | private: |
39 | flatbuffers::FlatBufferBuilder m_fbb; | 42 | flatbuffers::FlatBufferBuilder m_fbb; |
43 | MessageQueue mUserQueue; | ||
44 | MessageQueue mSynchronizerQueue; | ||
45 | Processor *mProcessor; | ||
40 | }; | 46 | }; |
41 | 47 | ||
42 | class DummyResourceFactory : public Akonadi2::ResourceFactory | 48 | class DummyResourceFactory : public Akonadi2::ResourceFactory |