summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-02 22:39:25 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-02 22:39:25 +0100
commit91d915a09b7d52c10edb1d4c1298fc2885b8a257 (patch)
tree407e9a2a1c902a68c78a7d08aab80ba47565fa88 /common
parent817bff01d15395206c1cc637d0c9ac0905007bf1 (diff)
downloadsink-91d915a09b7d52c10edb1d4c1298fc2885b8a257.tar.gz
sink-91d915a09b7d52c10edb1d4c1298fc2885b8a257.zip
DomainTypeAdaptor factory, per type preprocessor pipeline configuration.
Diffstat (limited to 'common')
-rw-r--r--common/entitybuffer.cpp5
-rw-r--r--common/entitybuffer.h1
-rw-r--r--common/metadata.fbs1
-rw-r--r--common/pipeline.cpp44
-rw-r--r--common/pipeline.h37
-rw-r--r--common/resource.cpp5
-rw-r--r--common/resource.h2
7 files changed, 77 insertions, 18 deletions
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp
index b9c9d76..c5d6bce 100644
--- a/common/entitybuffer.cpp
+++ b/common/entitybuffer.cpp
@@ -18,6 +18,11 @@ EntityBuffer::EntityBuffer(void *dataValue, int dataSize)
18 } 18 }
19} 19}
20 20
21const Akonadi2::Entity &EntityBuffer::entity()
22{
23 return *mEntity;
24}
25
21const flatbuffers::Vector<uint8_t>* EntityBuffer::resourceBuffer() 26const flatbuffers::Vector<uint8_t>* EntityBuffer::resourceBuffer()
22{ 27{
23 if (!mEntity) { 28 if (!mEntity) {
diff --git a/common/entitybuffer.h b/common/entitybuffer.h
index c072777..bd9360d 100644
--- a/common/entitybuffer.h
+++ b/common/entitybuffer.h
@@ -12,6 +12,7 @@ public:
12 const flatbuffers::Vector<uint8_t> *resourceBuffer(); 12 const flatbuffers::Vector<uint8_t> *resourceBuffer();
13 const flatbuffers::Vector<uint8_t> *metadataBuffer(); 13 const flatbuffers::Vector<uint8_t> *metadataBuffer();
14 const flatbuffers::Vector<uint8_t> *localBuffer(); 14 const flatbuffers::Vector<uint8_t> *localBuffer();
15 const Entity &entity();
15 16
16 static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const flatbuffers::Vector<uint8_t> *)> &handler); 17 static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const flatbuffers::Vector<uint8_t> *)> &handler);
17 static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize); 18 static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize);
diff --git a/common/metadata.fbs b/common/metadata.fbs
index 71684b6..34a8df2 100644
--- a/common/metadata.fbs
+++ b/common/metadata.fbs
@@ -2,6 +2,7 @@ namespace Akonadi2;
2 2
3table Metadata { 3table Metadata {
4 revision: ulong; 4 revision: ulong;
5 processed: bool = true;
5} 6}
6 7
7root_type Metadata; 8root_type Metadata;
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 04954ac..8d00480 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -41,10 +41,10 @@ public:
41 } 41 }
42 42
43 Storage storage; 43 Storage storage;
44 QVector<Preprocessor *> nullPipeline; 44 QHash<QString, QVector<Preprocessor *> > nullPipeline;
45 QVector<Preprocessor *> newPipeline; 45 QHash<QString, QVector<Preprocessor *> > newPipeline;
46 QVector<Preprocessor *> modifiedPipeline; 46 QHash<QString, QVector<Preprocessor *> > modifiedPipeline;
47 QVector<Preprocessor *> deletedPipeline; 47 QHash<QString, QVector<Preprocessor *> > deletedPipeline;
48 QVector<PipelineState> activePipelines; 48 QVector<PipelineState> activePipelines;
49 bool stepScheduled; 49 bool stepScheduled;
50}; 50};
@@ -60,6 +60,23 @@ Pipeline::~Pipeline()
60 delete d; 60 delete d;
61} 61}
62 62
63void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors)
64{
65 switch (pipelineType) {
66 case NewPipeline:
67 d->newPipeline[entityType] = preprocessors;
68 break;
69 case ModifiedPipeline:
70 d->modifiedPipeline[entityType] = preprocessors;
71 break;
72 case DeletedPipeline:
73 d->deletedPipeline[entityType] = preprocessors;
74 break;
75 default:
76 break;
77 };
78}
79
63Storage &Pipeline::storage() const 80Storage &Pipeline::storage() const
64{ 81{
65 return d->storage; 82 return d->storage;
@@ -68,12 +85,12 @@ Storage &Pipeline::storage() const
68void Pipeline::null() 85void Pipeline::null()
69{ 86{
70 //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) 87 //TODO: is there really any need for the null pipeline? if so, it should be doing something ;)
71 PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); 88 // PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline);
72 d->activePipelines << state; 89 // d->activePipelines << state;
73 state.step(); 90 // state.step();
74} 91}
75 92
76void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t size) 93void Pipeline::newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size)
77{ 94{
78 const qint64 newRevision = storage().maxRevision() + 1; 95 const qint64 newRevision = storage().maxRevision() + 1;
79 96
@@ -81,6 +98,7 @@ void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t
81 flatbuffers::FlatBufferBuilder metadataFbb; 98 flatbuffers::FlatBufferBuilder metadataFbb;
82 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 99 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
83 metadataBuilder.add_revision(newRevision); 100 metadataBuilder.add_revision(newRevision);
101 metadataBuilder.add_processed(false);
84 auto metadataBuffer = metadataBuilder.Finish(); 102 auto metadataBuffer = metadataBuilder.Finish();
85 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 103 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
86 104
@@ -90,21 +108,21 @@ void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t
90 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); 108 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
91 storage().setMaxRevision(newRevision); 109 storage().setMaxRevision(newRevision);
92 110
93 PipelineState state(this, NewPipeline, key, d->newPipeline); 111 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType]);
94 d->activePipelines << state; 112 d->activePipelines << state;
95 state.step(); 113 state.step();
96} 114}
97 115
98void Pipeline::modifiedEntity(const QByteArray &key, void *data, size_t size) 116void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size)
99{ 117{
100 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline); 118 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType]);
101 d->activePipelines << state; 119 d->activePipelines << state;
102 state.step(); 120 state.step();
103} 121}
104 122
105void Pipeline::deletedEntity(const QByteArray &key) 123void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key)
106{ 124{
107 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline); 125 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType]);
108 d->activePipelines << state; 126 d->activePipelines << state;
109 state.step(); 127 state.step();
110} 128}
diff --git a/common/pipeline.h b/common/pipeline.h
index 6ef8703..8373899 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -27,6 +27,7 @@
27 27
28#include <akonadi2common_export.h> 28#include <akonadi2common_export.h>
29#include <storage.h> 29#include <storage.h>
30#include <clientapi.h> //For domain types
30 31
31namespace Akonadi2 32namespace Akonadi2
32{ 33{
@@ -46,12 +47,34 @@ public:
46 47
47 Storage &storage() const; 48 Storage &storage() const;
48 49
50 // template <typename T>
51 // Storage &storage() const;
52
53 template <typename T>
54 void setPreprocessors(Type type, const QVector<Preprocessor *> &preprocessors)
55 {
56 setPreprocessors(Akonadi2::Domain::getTypeName<T>(), type, preprocessors);
57 }
58
49 void null(); 59 void null();
50 //FIXME We should probably directly provide a DomainTypeAdapter here. The data has already been written and we only need to read it for processing. And we need to read all buffers. 60
51 void newEntity(const QByteArray &key, void *resourceBufferData, size_t size); 61 template <typename T>
52 //TODO Send local buffer data as well? 62 void newEntity(const QByteArray &key, void *resourceBufferData, size_t size)
53 void modifiedEntity(const QByteArray &key, void *data, size_t size); 63 {
54 void deletedEntity(const QByteArray &key); 64 newEntity(Akonadi2::Domain::getTypeName<T>(), key, resourceBufferData, size);
65 }
66
67 template <typename T>
68 void modifiedEntity(const QByteArray &key, void *data, size_t size)
69 {
70 modifiedEntity(Akonadi2::Domain::getTypeName<T>(), key, data, size);
71 }
72
73 template <typename T>
74 void deletedEntity(const QByteArray &key)
75 {
76 deletedEntity(Akonadi2::Domain::getTypeName<T>(), key);
77 }
55 78
56Q_SIGNALS: 79Q_SIGNALS:
57 void revisionUpdated(); 80 void revisionUpdated();
@@ -61,6 +84,10 @@ private Q_SLOTS:
61 void stepPipelines(); 84 void stepPipelines();
62 85
63private: 86private:
87 void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors);
88 void newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size);
89 void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size);
90 void deletedEntity(const QString &entityType, const QByteArray &key);
64 void pipelineStepped(const PipelineState &state); 91 void pipelineStepped(const PipelineState &state);
65 void pipelineCompleted(const PipelineState &state); 92 void pipelineCompleted(const PipelineState &state);
66 void scheduleStep(); 93 void scheduleStep();
diff --git a/common/resource.cpp b/common/resource.cpp
index bba6609..db08c4f 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -39,6 +39,11 @@ Resource::~Resource()
39 //delete d; 39 //delete d;
40} 40}
41 41
42void Resource::configurePipeline(Pipeline *pipeline)
43{
44
45}
46
42void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) 47void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline)
43{ 48{
44 Q_UNUSED(commandId) 49 Q_UNUSED(commandId)
diff --git a/common/resource.h b/common/resource.h
index fb42c1b..52a28a6 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -36,6 +36,8 @@ public:
36 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); 36 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline);
37 virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); 37 virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline);
38 38
39 virtual void configurePipeline(Pipeline *pipeline);
40
39private: 41private:
40 class Private; 42 class Private;
41 Private * const d; 43 Private * const d;