diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-01-02 22:39:25 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-01-02 22:39:25 +0100 |
commit | 91d915a09b7d52c10edb1d4c1298fc2885b8a257 (patch) | |
tree | 407e9a2a1c902a68c78a7d08aab80ba47565fa88 /common | |
parent | 817bff01d15395206c1cc637d0c9ac0905007bf1 (diff) | |
download | sink-91d915a09b7d52c10edb1d4c1298fc2885b8a257.tar.gz sink-91d915a09b7d52c10edb1d4c1298fc2885b8a257.zip |
DomainTypeAdaptor factory, per type preprocessor pipeline configuration.
Diffstat (limited to 'common')
-rw-r--r-- | common/entitybuffer.cpp | 5 | ||||
-rw-r--r-- | common/entitybuffer.h | 1 | ||||
-rw-r--r-- | common/metadata.fbs | 1 | ||||
-rw-r--r-- | common/pipeline.cpp | 44 | ||||
-rw-r--r-- | common/pipeline.h | 37 | ||||
-rw-r--r-- | common/resource.cpp | 5 | ||||
-rw-r--r-- | common/resource.h | 2 |
7 files changed, 77 insertions, 18 deletions
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp index b9c9d76..c5d6bce 100644 --- a/common/entitybuffer.cpp +++ b/common/entitybuffer.cpp | |||
@@ -18,6 +18,11 @@ EntityBuffer::EntityBuffer(void *dataValue, int dataSize) | |||
18 | } | 18 | } |
19 | } | 19 | } |
20 | 20 | ||
21 | const Akonadi2::Entity &EntityBuffer::entity() | ||
22 | { | ||
23 | return *mEntity; | ||
24 | } | ||
25 | |||
21 | const flatbuffers::Vector<uint8_t>* EntityBuffer::resourceBuffer() | 26 | const flatbuffers::Vector<uint8_t>* EntityBuffer::resourceBuffer() |
22 | { | 27 | { |
23 | if (!mEntity) { | 28 | if (!mEntity) { |
diff --git a/common/entitybuffer.h b/common/entitybuffer.h index c072777..bd9360d 100644 --- a/common/entitybuffer.h +++ b/common/entitybuffer.h | |||
@@ -12,6 +12,7 @@ public: | |||
12 | const flatbuffers::Vector<uint8_t> *resourceBuffer(); | 12 | const flatbuffers::Vector<uint8_t> *resourceBuffer(); |
13 | const flatbuffers::Vector<uint8_t> *metadataBuffer(); | 13 | const flatbuffers::Vector<uint8_t> *metadataBuffer(); |
14 | const flatbuffers::Vector<uint8_t> *localBuffer(); | 14 | const flatbuffers::Vector<uint8_t> *localBuffer(); |
15 | const Entity &entity(); | ||
15 | 16 | ||
16 | static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const flatbuffers::Vector<uint8_t> *)> &handler); | 17 | static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const flatbuffers::Vector<uint8_t> *)> &handler); |
17 | static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize); | 18 | static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize); |
diff --git a/common/metadata.fbs b/common/metadata.fbs index 71684b6..34a8df2 100644 --- a/common/metadata.fbs +++ b/common/metadata.fbs | |||
@@ -2,6 +2,7 @@ namespace Akonadi2; | |||
2 | 2 | ||
3 | table Metadata { | 3 | table Metadata { |
4 | revision: ulong; | 4 | revision: ulong; |
5 | processed: bool = true; | ||
5 | } | 6 | } |
6 | 7 | ||
7 | root_type Metadata; | 8 | root_type Metadata; |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 04954ac..8d00480 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -41,10 +41,10 @@ public: | |||
41 | } | 41 | } |
42 | 42 | ||
43 | Storage storage; | 43 | Storage storage; |
44 | QVector<Preprocessor *> nullPipeline; | 44 | QHash<QString, QVector<Preprocessor *> > nullPipeline; |
45 | QVector<Preprocessor *> newPipeline; | 45 | QHash<QString, QVector<Preprocessor *> > newPipeline; |
46 | QVector<Preprocessor *> modifiedPipeline; | 46 | QHash<QString, QVector<Preprocessor *> > modifiedPipeline; |
47 | QVector<Preprocessor *> deletedPipeline; | 47 | QHash<QString, QVector<Preprocessor *> > deletedPipeline; |
48 | QVector<PipelineState> activePipelines; | 48 | QVector<PipelineState> activePipelines; |
49 | bool stepScheduled; | 49 | bool stepScheduled; |
50 | }; | 50 | }; |
@@ -60,6 +60,23 @@ Pipeline::~Pipeline() | |||
60 | delete d; | 60 | delete d; |
61 | } | 61 | } |
62 | 62 | ||
63 | void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors) | ||
64 | { | ||
65 | switch (pipelineType) { | ||
66 | case NewPipeline: | ||
67 | d->newPipeline[entityType] = preprocessors; | ||
68 | break; | ||
69 | case ModifiedPipeline: | ||
70 | d->modifiedPipeline[entityType] = preprocessors; | ||
71 | break; | ||
72 | case DeletedPipeline: | ||
73 | d->deletedPipeline[entityType] = preprocessors; | ||
74 | break; | ||
75 | default: | ||
76 | break; | ||
77 | }; | ||
78 | } | ||
79 | |||
63 | Storage &Pipeline::storage() const | 80 | Storage &Pipeline::storage() const |
64 | { | 81 | { |
65 | return d->storage; | 82 | return d->storage; |
@@ -68,12 +85,12 @@ Storage &Pipeline::storage() const | |||
68 | void Pipeline::null() | 85 | void Pipeline::null() |
69 | { | 86 | { |
70 | //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) | 87 | //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) |
71 | PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); | 88 | // PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); |
72 | d->activePipelines << state; | 89 | // d->activePipelines << state; |
73 | state.step(); | 90 | // state.step(); |
74 | } | 91 | } |
75 | 92 | ||
76 | void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t size) | 93 | void Pipeline::newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size) |
77 | { | 94 | { |
78 | const qint64 newRevision = storage().maxRevision() + 1; | 95 | const qint64 newRevision = storage().maxRevision() + 1; |
79 | 96 | ||
@@ -81,6 +98,7 @@ void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t | |||
81 | flatbuffers::FlatBufferBuilder metadataFbb; | 98 | flatbuffers::FlatBufferBuilder metadataFbb; |
82 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | 99 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); |
83 | metadataBuilder.add_revision(newRevision); | 100 | metadataBuilder.add_revision(newRevision); |
101 | metadataBuilder.add_processed(false); | ||
84 | auto metadataBuffer = metadataBuilder.Finish(); | 102 | auto metadataBuffer = metadataBuilder.Finish(); |
85 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 103 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); |
86 | 104 | ||
@@ -90,21 +108,21 @@ void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t | |||
90 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | 108 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); |
91 | storage().setMaxRevision(newRevision); | 109 | storage().setMaxRevision(newRevision); |
92 | 110 | ||
93 | PipelineState state(this, NewPipeline, key, d->newPipeline); | 111 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType]); |
94 | d->activePipelines << state; | 112 | d->activePipelines << state; |
95 | state.step(); | 113 | state.step(); |
96 | } | 114 | } |
97 | 115 | ||
98 | void Pipeline::modifiedEntity(const QByteArray &key, void *data, size_t size) | 116 | void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) |
99 | { | 117 | { |
100 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline); | 118 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType]); |
101 | d->activePipelines << state; | 119 | d->activePipelines << state; |
102 | state.step(); | 120 | state.step(); |
103 | } | 121 | } |
104 | 122 | ||
105 | void Pipeline::deletedEntity(const QByteArray &key) | 123 | void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) |
106 | { | 124 | { |
107 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline); | 125 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType]); |
108 | d->activePipelines << state; | 126 | d->activePipelines << state; |
109 | state.step(); | 127 | state.step(); |
110 | } | 128 | } |
diff --git a/common/pipeline.h b/common/pipeline.h index 6ef8703..8373899 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -27,6 +27,7 @@ | |||
27 | 27 | ||
28 | #include <akonadi2common_export.h> | 28 | #include <akonadi2common_export.h> |
29 | #include <storage.h> | 29 | #include <storage.h> |
30 | #include <clientapi.h> //For domain types | ||
30 | 31 | ||
31 | namespace Akonadi2 | 32 | namespace Akonadi2 |
32 | { | 33 | { |
@@ -46,12 +47,34 @@ public: | |||
46 | 47 | ||
47 | Storage &storage() const; | 48 | Storage &storage() const; |
48 | 49 | ||
50 | // template <typename T> | ||
51 | // Storage &storage() const; | ||
52 | |||
53 | template <typename T> | ||
54 | void setPreprocessors(Type type, const QVector<Preprocessor *> &preprocessors) | ||
55 | { | ||
56 | setPreprocessors(Akonadi2::Domain::getTypeName<T>(), type, preprocessors); | ||
57 | } | ||
58 | |||
49 | void null(); | 59 | void null(); |
50 | //FIXME We should probably directly provide a DomainTypeAdapter here. The data has already been written and we only need to read it for processing. And we need to read all buffers. | 60 | |
51 | void newEntity(const QByteArray &key, void *resourceBufferData, size_t size); | 61 | template <typename T> |
52 | //TODO Send local buffer data as well? | 62 | void newEntity(const QByteArray &key, void *resourceBufferData, size_t size) |
53 | void modifiedEntity(const QByteArray &key, void *data, size_t size); | 63 | { |
54 | void deletedEntity(const QByteArray &key); | 64 | newEntity(Akonadi2::Domain::getTypeName<T>(), key, resourceBufferData, size); |
65 | } | ||
66 | |||
67 | template <typename T> | ||
68 | void modifiedEntity(const QByteArray &key, void *data, size_t size) | ||
69 | { | ||
70 | modifiedEntity(Akonadi2::Domain::getTypeName<T>(), key, data, size); | ||
71 | } | ||
72 | |||
73 | template <typename T> | ||
74 | void deletedEntity(const QByteArray &key) | ||
75 | { | ||
76 | deletedEntity(Akonadi2::Domain::getTypeName<T>(), key); | ||
77 | } | ||
55 | 78 | ||
56 | Q_SIGNALS: | 79 | Q_SIGNALS: |
57 | void revisionUpdated(); | 80 | void revisionUpdated(); |
@@ -61,6 +84,10 @@ private Q_SLOTS: | |||
61 | void stepPipelines(); | 84 | void stepPipelines(); |
62 | 85 | ||
63 | private: | 86 | private: |
87 | void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); | ||
88 | void newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size); | ||
89 | void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); | ||
90 | void deletedEntity(const QString &entityType, const QByteArray &key); | ||
64 | void pipelineStepped(const PipelineState &state); | 91 | void pipelineStepped(const PipelineState &state); |
65 | void pipelineCompleted(const PipelineState &state); | 92 | void pipelineCompleted(const PipelineState &state); |
66 | void scheduleStep(); | 93 | void scheduleStep(); |
diff --git a/common/resource.cpp b/common/resource.cpp index bba6609..db08c4f 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -39,6 +39,11 @@ Resource::~Resource() | |||
39 | //delete d; | 39 | //delete d; |
40 | } | 40 | } |
41 | 41 | ||
42 | void Resource::configurePipeline(Pipeline *pipeline) | ||
43 | { | ||
44 | |||
45 | } | ||
46 | |||
42 | void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) | 47 | void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) |
43 | { | 48 | { |
44 | Q_UNUSED(commandId) | 49 | Q_UNUSED(commandId) |
diff --git a/common/resource.h b/common/resource.h index fb42c1b..52a28a6 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -36,6 +36,8 @@ public: | |||
36 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | 36 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); |
37 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); | 37 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); |
38 | 38 | ||
39 | virtual void configurePipeline(Pipeline *pipeline); | ||
40 | |||
39 | private: | 41 | private: |
40 | class Private; | 42 | class Private; |
41 | Private * const d; | 43 | Private * const d; |