summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-15 01:56:09 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-15 01:56:09 +0100
commitbc2a95cad05e454a84c317f1078edb329bd3afd4 (patch)
tree4cb54032b075a8730a532ccd7485db598e859ac8
parent70bb707903da21103b84e0f3effcaa0a24612d5c (diff)
downloadsink-bc2a95cad05e454a84c317f1078edb329bd3afd4.tar.gz
sink-bc2a95cad05e454a84c317f1078edb329bd3afd4.zip
Writing from facade.
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/commands/createentity.fbs2
-rw-r--r--common/entitybuffer.cpp9
-rw-r--r--common/entitybuffer.h2
-rw-r--r--common/metadata.fbs1
-rw-r--r--common/pipeline.cpp79
-rw-r--r--common/pipeline.h41
-rw-r--r--common/queuedcommand.fbs11
-rw-r--r--common/resourceaccess.h2
-rw-r--r--dummyresource/domainadaptor.cpp5
-rw-r--r--dummyresource/facade.cpp24
-rw-r--r--dummyresource/resourcefactory.cpp123
-rw-r--r--dummyresource/resourcefactory.h6
-rw-r--r--tests/dummyresourcetest.cpp83
14 files changed, 293 insertions, 96 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 671d1cd..6f8fee3 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -10,6 +10,7 @@ generate_flatbuffers(
10 domain/event 10 domain/event
11 entity 11 entity
12 metadata 12 metadata
13 queuedcommand
13) 14)
14 15
15if (STORAGE_unqlite) 16if (STORAGE_unqlite)
diff --git a/common/commands/createentity.fbs b/common/commands/createentity.fbs
index 564c231..23eeff9 100644
--- a/common/commands/createentity.fbs
+++ b/common/commands/createentity.fbs
@@ -1,4 +1,4 @@
1namespace Akonadi2; 1namespace Akonadi2.Commands;
2 2
3table CreateEntity { 3table CreateEntity {
4 domainType: string; 4 domainType: string;
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp
index aa5847c..4af84ef 100644
--- a/common/entitybuffer.cpp
+++ b/common/entitybuffer.cpp
@@ -56,11 +56,12 @@ void EntityBuffer::extractResourceBuffer(void *dataValue, int dataSize, const st
56 } 56 }
57} 57}
58 58
59void EntityBuffer::assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize) 59void EntityBuffer::assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize)
60{ 60{
61 auto metadata = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(metadataData), metadataSize); 61 qDebug() << "res size: " << resourceSize;
62 auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(resourceData), resourceSize); 62 auto metadata = fbb.CreateVector<uint8_t>(static_cast<uint8_t const*>(metadataData), metadataSize);
63 auto local = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(localData), localSize); 63 auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t const*>(resourceData), resourceSize);
64 auto local = fbb.CreateVector<uint8_t>(static_cast<uint8_t const*>(localData), localSize);
64 auto builder = Akonadi2::EntityBuilder(fbb); 65 auto builder = Akonadi2::EntityBuilder(fbb);
65 builder.add_metadata(metadata); 66 builder.add_metadata(metadata);
66 builder.add_resource(resource); 67 builder.add_resource(resource);
diff --git a/common/entitybuffer.h b/common/entitybuffer.h
index 600b04d..097b450 100644
--- a/common/entitybuffer.h
+++ b/common/entitybuffer.h
@@ -15,7 +15,7 @@ public:
15 const Entity &entity(); 15 const Entity &entity();
16 16
17 static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler); 17 static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler);
18 static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize); 18 static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize);
19 19
20private: 20private:
21 const Entity *mEntity; 21 const Entity *mEntity;
diff --git a/common/metadata.fbs b/common/metadata.fbs
index 34a8df2..bb1163d 100644
--- a/common/metadata.fbs
+++ b/common/metadata.fbs
@@ -3,6 +3,7 @@ namespace Akonadi2;
3table Metadata { 3table Metadata {
4 revision: ulong; 4 revision: ulong;
5 processed: bool = true; 5 processed: bool = true;
6 processingProgress: [string];
6} 7}
7 8
8root_type Metadata; 9root_type Metadata;
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 10bae54..9cc7450 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -23,10 +23,13 @@
23#include <QByteArray> 23#include <QByteArray>
24#include <QStandardPaths> 24#include <QStandardPaths>
25#include <QVector> 25#include <QVector>
26#include <QUuid>
26#include <QDebug> 27#include <QDebug>
27#include "entity_generated.h" 28#include "entity_generated.h"
28#include "metadata_generated.h" 29#include "metadata_generated.h"
30#include "createentity_generated.h"
29#include "entitybuffer.h" 31#include "entitybuffer.h"
32#include "async/src/async.h"
30 33
31namespace Akonadi2 34namespace Akonadi2
32{ 35{
@@ -90,39 +93,59 @@ void Pipeline::null()
90 // state.step(); 93 // state.step();
91} 94}
92 95
93void Pipeline::newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size) 96Async::Job<void> Pipeline::newEntity(void const *command, size_t size)
94{ 97{
95 const qint64 newRevision = storage().maxRevision() + 1; 98 qDebug() << "new entity";
96 99 Async::start<void>([&](Async::Future<void> future) {
97 //Add metadata buffer 100
98 flatbuffers::FlatBufferBuilder metadataFbb; 101 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
99 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 102 const auto key = QUuid::createUuid().toString().toUtf8();
100 metadataBuilder.add_revision(newRevision); 103
101 metadataBuilder.add_processed(false); 104 //TODO figure out if we already have created a revision for the message?
102 auto metadataBuffer = metadataBuilder.Finish(); 105 const qint64 newRevision = storage().maxRevision() + 1;
103 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 106
104 107 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
105 flatbuffers::FlatBufferBuilder fbb; 108 //TODO rename createEntitiy->domainType to bufferType
106 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), resourceBufferData, size, 0, 0); 109 const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
107 110 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data());
108 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); 111 //
109 storage().setMaxRevision(newRevision); 112 // const QString entityType;
113 // auto entity = Akonadi2::GetEntity(0);
114
115 //Add metadata buffer
116 flatbuffers::FlatBufferBuilder metadataFbb;
117 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
118 metadataBuilder.add_revision(newRevision);
119 metadataBuilder.add_processed(false);
120 auto metadataBuffer = metadataBuilder.Finish();
121 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
122 //TODO we should reserve some space in metadata for in-place updates
123
124 flatbuffers::FlatBufferBuilder fbb;
125 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
126
127 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
128 storage().setMaxRevision(newRevision);
129
130 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() {
131 future.setFinished();
132 });
133 d->activePipelines << state;
134 state.step();
110 135
111 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType]); 136 });
112 d->activePipelines << state;
113 state.step();
114} 137}
115 138
116void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) 139void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size)
117{ 140{
118 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType]); 141 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [](){});
119 d->activePipelines << state; 142 d->activePipelines << state;
120 state.step(); 143 state.step();
121} 144}
122 145
123void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) 146void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key)
124{ 147{
125 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType]); 148 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [](){});
126 d->activePipelines << state; 149 d->activePipelines << state;
127 state.step(); 150 state.step();
128} 151}
@@ -160,6 +183,7 @@ void Pipeline::pipelineCompleted(const PipelineState &state)
160 } 183 }
161 184
162 if (state.type() != NullPipeline) { 185 if (state.type() != NullPipeline) {
186 //TODO what revision is finalized?
163 emit revisionUpdated(); 187 emit revisionUpdated();
164 } 188 }
165 scheduleStep(); 189 scheduleStep();
@@ -172,12 +196,13 @@ void Pipeline::pipelineCompleted(const PipelineState &state)
172class PipelineState::Private : public QSharedData 196class PipelineState::Private : public QSharedData
173{ 197{
174public: 198public:
175 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters) 199 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c)
176 : pipeline(p), 200 : pipeline(p),
177 type(t), 201 type(t),
178 key(k), 202 key(k),
179 filterIt(filters), 203 filterIt(filters),
180 idle(true) 204 idle(true),
205 callback(c)
181 {} 206 {}
182 207
183 Private() 208 Private()
@@ -191,6 +216,7 @@ public:
191 QByteArray key; 216 QByteArray key;
192 QVectorIterator<Preprocessor *> filterIt; 217 QVectorIterator<Preprocessor *> filterIt;
193 bool idle; 218 bool idle;
219 std::function<void()> callback;
194}; 220};
195 221
196PipelineState::PipelineState() 222PipelineState::PipelineState()
@@ -199,8 +225,8 @@ PipelineState::PipelineState()
199 225
200} 226}
201 227
202PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters) 228PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, const std::function<void()> &callback)
203 : d(new Private(pipeline, type, key, filters)) 229 : d(new Private(pipeline, type, key, filters, callback))
204{ 230{
205} 231}
206 232
@@ -247,6 +273,7 @@ void PipelineState::step()
247 273
248 d->idle = false; 274 d->idle = false;
249 if (d->filterIt.hasNext()) { 275 if (d->filterIt.hasNext()) {
276 //TODO skip step if already processed
250 d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { 277 d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool {
251 auto entity = Akonadi2::GetEntity(dataValue); 278 auto entity = Akonadi2::GetEntity(dataValue);
252 d->filterIt.next()->process(*this, *entity); 279 d->filterIt.next()->process(*this, *entity);
@@ -254,11 +281,13 @@ void PipelineState::step()
254 }); 281 });
255 } else { 282 } else {
256 d->pipeline->pipelineCompleted(*this); 283 d->pipeline->pipelineCompleted(*this);
284 d->callback();
257 } 285 }
258} 286}
259 287
260void PipelineState::processingCompleted(Preprocessor *filter) 288void PipelineState::processingCompleted(Preprocessor *filter)
261{ 289{
290 //TODO record processing progress
262 if (d->pipeline && filter == d->filterIt.peekPrevious()) { 291 if (d->pipeline && filter == d->filterIt.peekPrevious()) {
263 d->idle = true; 292 d->idle = true;
264 d->pipeline->pipelineStepped(*this); 293 d->pipeline->pipelineStepped(*this);
diff --git a/common/pipeline.h b/common/pipeline.h
index 6b847f5..918d21e 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -27,7 +27,7 @@
27 27
28#include <akonadi2common_export.h> 28#include <akonadi2common_export.h>
29#include <storage.h> 29#include <storage.h>
30#include <clientapi.h> //For domain types 30#include "async/src/async.h"
31 31
32#include "entity_generated.h" 32#include "entity_generated.h"
33 33
@@ -49,34 +49,13 @@ public:
49 49
50 Storage &storage() const; 50 Storage &storage() const;
51 51
52 // template <typename T> 52 void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors);
53 // Storage &storage() const;
54
55 template <typename T>
56 void setPreprocessors(Type type, const QVector<Preprocessor *> &preprocessors)
57 {
58 setPreprocessors(Akonadi2::Domain::getTypeName<T>(), type, preprocessors);
59 }
60 53
61 void null(); 54 void null();
62 55
63 template <typename T> 56 Async::Job<void> newEntity(void const *command, size_t size);
64 void newEntity(const QByteArray &key, void *resourceBufferData, size_t size) 57 void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size);
65 { 58 void deletedEntity(const QString &entityType, const QByteArray &key);
66 newEntity(Akonadi2::Domain::getTypeName<T>(), key, resourceBufferData, size);
67 }
68
69 template <typename T>
70 void modifiedEntity(const QByteArray &key, void *data, size_t size)
71 {
72 modifiedEntity(Akonadi2::Domain::getTypeName<T>(), key, data, size);
73 }
74
75 template <typename T>
76 void deletedEntity(const QByteArray &key)
77 {
78 deletedEntity(Akonadi2::Domain::getTypeName<T>(), key);
79 }
80 59
81Q_SIGNALS: 60Q_SIGNALS:
82 void revisionUpdated(); 61 void revisionUpdated();
@@ -86,10 +65,6 @@ private Q_SLOTS:
86 void stepPipelines(); 65 void stepPipelines();
87 66
88private: 67private:
89 void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors);
90 void newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size);
91 void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size);
92 void deletedEntity(const QString &entityType, const QByteArray &key);
93 void pipelineStepped(const PipelineState &state); 68 void pipelineStepped(const PipelineState &state);
94 void pipelineCompleted(const PipelineState &state); 69 void pipelineCompleted(const PipelineState &state);
95 void scheduleStep(); 70 void scheduleStep();
@@ -104,7 +79,7 @@ class AKONADI2COMMON_EXPORT PipelineState
104{ 79{
105public: 80public:
106 PipelineState(); 81 PipelineState();
107 PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters); 82 PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, const std::function<void()> &callback);
108 PipelineState(const PipelineState &other); 83 PipelineState(const PipelineState &other);
109 ~PipelineState(); 84 ~PipelineState();
110 85
@@ -114,6 +89,7 @@ public:
114 bool isIdle() const; 89 bool isIdle() const;
115 QByteArray key() const; 90 QByteArray key() const;
116 Pipeline::Type type() const; 91 Pipeline::Type type() const;
92 //TODO expose command
117 93
118 void step(); 94 void step();
119 void processingCompleted(Preprocessor *filter); 95 void processingCompleted(Preprocessor *filter);
@@ -129,7 +105,10 @@ public:
129 Preprocessor(); 105 Preprocessor();
130 virtual ~Preprocessor(); 106 virtual ~Preprocessor();
131 107
108 //TODO pass actual command as well, for changerecording
132 virtual void process(PipelineState state, const Akonadi2::Entity &); 109 virtual void process(PipelineState state, const Akonadi2::Entity &);
110 //TODO to record progress
111 // virtual QString id();
133 112
134protected: 113protected:
135 void processingCompleted(PipelineState state); 114 void processingCompleted(PipelineState state);
diff --git a/common/queuedcommand.fbs b/common/queuedcommand.fbs
new file mode 100644
index 0000000..0ca899f
--- /dev/null
+++ b/common/queuedcommand.fbs
@@ -0,0 +1,11 @@
1namespace Akonadi2;
2
3table QueuedCommand {
4 commandId: int;
5 command: [ubyte];
6 // entityId: string;
7 // sourceRevision: ulong;
8 // targetRevision: [ubyte];
9}
10
11root_type QueuedCommand;
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index d45ebde..0a333f6 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -43,7 +43,7 @@ public:
43 43
44 //TODO use jobs 44 //TODO use jobs
45 void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>()); 45 void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>());
46 void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback); 46 void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback = std::function<void()>());
47 Async::Job<void> synchronizeResource(); 47 Async::Job<void> synchronizeResource();
48 48
49public Q_SLOTS: 49public Q_SLOTS:
diff --git a/dummyresource/domainadaptor.cpp b/dummyresource/domainadaptor.cpp
index ae001cf..9bd3770 100644
--- a/dummyresource/domainadaptor.cpp
+++ b/dummyresource/domainadaptor.cpp
@@ -29,8 +29,8 @@ public:
29 29
30 void setProperty(const QString &key, const QVariant &value) 30 void setProperty(const QString &key, const QVariant &value)
31 { 31 {
32 if (mResourceMapper->mWriteAccessors.contains(key)) { 32 if (mResourceMapper && mResourceMapper->mWriteAccessors.contains(key)) {
33 // mResourceMapper.setProperty(key, value, mResourceBuffer); 33 // mResourceMapper->setProperty(key, value, mResourceBuffer);
34 } else { 34 } else {
35 // mLocalMapper.; 35 // mLocalMapper.;
36 } 36 }
@@ -69,6 +69,7 @@ DummyEventAdaptorFactory::DummyEventAdaptorFactory()
69 mResourceMapper->mReadAccessors.insert("summary", [](DummyEvent const *buffer) -> QVariant { 69 mResourceMapper->mReadAccessors.insert("summary", [](DummyEvent const *buffer) -> QVariant {
70 return QString::fromStdString(buffer->summary()->c_str()); 70 return QString::fromStdString(buffer->summary()->c_str());
71 }); 71 });
72 mLocalMapper = QSharedPointer<PropertyMapper<Akonadi2::Domain::Buffer::Event> >::create();
72 //TODO set accessors for all properties 73 //TODO set accessors for all properties
73 74
74} 75}
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp
index f754c7e..668fbbf 100644
--- a/dummyresource/facade.cpp
+++ b/dummyresource/facade.cpp
@@ -28,6 +28,7 @@
28#include "event_generated.h" 28#include "event_generated.h"
29#include "entity_generated.h" 29#include "entity_generated.h"
30#include "metadata_generated.h" 30#include "metadata_generated.h"
31#include "createentity_generated.h"
31#include "domainadaptor.h" 32#include "domainadaptor.h"
32#include <common/entitybuffer.h> 33#include <common/entitybuffer.h>
33 34
@@ -48,6 +49,29 @@ DummyResourceFacade::~DummyResourceFacade()
48void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject) 49void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject)
49{ 50{
50 //Create message buffer and send to resource 51 //Create message buffer and send to resource
52 flatbuffers::FlatBufferBuilder eventFbb;
53 eventFbb.Clear();
54 {
55 auto summary = eventFbb.CreateString("summary");
56 // auto data = fbb.CreateUninitializedVector<uint8_t>(attachmentSize);
57 DummyCalendar::DummyEventBuilder eventBuilder(eventFbb);
58 eventBuilder.add_summary(summary);
59 auto eventLocation = eventBuilder.Finish();
60 DummyCalendar::FinishDummyEventBuffer(eventFbb, eventLocation);
61 // memcpy((void*)DummyCalendar::GetDummyEvent(fbb.GetBufferPointer())->attachment()->Data(), rawData, attachmentSize);
62 }
63 flatbuffers::FlatBufferBuilder entityFbb;
64 Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), 0, 0);
65
66 flatbuffers::FlatBufferBuilder fbb;
67 auto type = fbb.CreateString(Akonadi2::Domain::getTypeName<Akonadi2::Domain::Event>().toStdString().data());
68 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
69 Akonadi2::Commands::CreateEntityBuilder builder(fbb);
70 builder.add_domainType(type);
71 builder.add_delta(delta);
72 auto location = builder.Finish();
73 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
74 mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb);
51} 75}
52 76
53void DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject) 77void DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject)
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index e4f7e41..e14aa01 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -23,18 +23,20 @@
23#include "pipeline.h" 23#include "pipeline.h"
24#include "dummycalendar_generated.h" 24#include "dummycalendar_generated.h"
25#include "metadata_generated.h" 25#include "metadata_generated.h"
26#include "queuedcommand_generated.h"
26#include "domainadaptor.h" 27#include "domainadaptor.h"
28#include "commands.h"
29#include "clientapi.h"
27#include <QUuid> 30#include <QUuid>
28 31
29/* 32/*
30 * Figure out how to implement various classes of processors: 33 * Figure out how to implement various classes of processors:
31 * * read-only (index and such) => domain adapter 34 * * read-only (index and such) => extractor function, probably using domain adaptor
32 * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) 35 * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?)
33 * * flag extractors? => like read-only? Or write to local portion of buffer? 36 * * flag extractors? => like read-only? Or write to local portion of buffer?
34 * ** $ISSPAM should become part of domain object and is written to the local part of the mail. 37 * ** $ISSPAM should become part of domain object and is written to the local part of the mail.
35 * ** => value could be calculated by the server directly 38 * ** => value could be calculated by the server directly
36 */ 39 */
37// template <typename DomainType>
38class SimpleProcessor : public Akonadi2::Preprocessor 40class SimpleProcessor : public Akonadi2::Preprocessor
39{ 41{
40public: 42public:
@@ -96,21 +98,108 @@ QMap<QString, QString> populate()
96 98
97static QMap<QString, QString> s_dataSource = populate(); 99static QMap<QString, QString> s_dataSource = populate();
98 100
101//Drives the pipeline using the output from all command queues
102class Processor : public QObject
103{
104 Q_OBJECT
105public:
106 Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues)
107 : QObject(),
108 mPipeline(pipeline),
109 mCommandQueues(commandQueues),
110 mProcessingLock(false)
111 {
112 for (auto queue : mCommandQueues) {
113 bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process);
114 Q_ASSERT(ret);
115 }
116 }
117
118private slots:
119 void process()
120 {
121 if (mProcessingLock) {
122 return;
123 }
124 mProcessingLock = true;
125 //Empty queue after queue
126 //FIXME the for and while loops should be async, otherwise we process all messages in parallel
127 for (auto queue : mCommandQueues) {
128 qDebug() << "processing queue";
129 bool processedMessage = false;
130 while (processedMessage) {
131 qDebug() << "process";
132 processedMessage = false;
133 queue->dequeue([this, &processedMessage](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
134 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
135 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
136 qWarning() << "invalid buffer";
137 processedMessage = false;
138 messageQueueCallback(false);
139 return;
140 }
141 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
142 qDebug() << "Dequeued: " << queuedCommand->commandId();
143 //Throw command into appropriate pipeline
144 switch (queuedCommand->commandId()) {
145 case Akonadi2::Commands::DeleteEntityCommand:
146 //mPipeline->removedEntity
147 break;
148 case Akonadi2::Commands::ModifyEntityCommand:
149 //mPipeline->modifiedEntity
150 break;
151 case Akonadi2::Commands::CreateEntityCommand: {
152 //TODO job lifetime management
153 auto job = mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([&messageQueueCallback](Async::Future<void> future) {
154 messageQueueCallback(true);
155 });
156 job.exec();
157 }
158 break;
159 default:
160 //Unhandled command
161 qWarning() << "Unhandled command";
162 messageQueueCallback(true);
163 break;
164 }
165 processedMessage = true;
166 },
167 [&processedMessage](const MessageQueue::Error &error) {
168 processedMessage = false;
169 });
170 }
171 }
172 mProcessingLock = false;
173 }
174
175private:
176 Akonadi2::Pipeline *mPipeline;
177 //Ordered by priority
178 QList<MessageQueue*> mCommandQueues;
179 bool mProcessingLock;
180};
181
99DummyResource::DummyResource() 182DummyResource::DummyResource()
100 : Akonadi2::Resource() 183 : Akonadi2::Resource(),
184 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"),
185 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue")
101{ 186{
102} 187}
103 188
104void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) 189void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline)
105{ 190{
106 auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create(); 191 auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create();
192 //FIXME we should setup for each resource entity type, not for each domain type
193 //i.e. If a resource stores tags as part of each message it needs to update the tag index
107 //TODO setup preprocessors for each domain type and pipeline type allowing full customization 194 //TODO setup preprocessors for each domain type and pipeline type allowing full customization
108 //Eventually the order should be self configuring, for now it's hardcoded. 195 //Eventually the order should be self configuring, for now it's hardcoded.
109 auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { 196 auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) {
110 auto adaptor = eventFactory->createAdaptor(entity); 197 auto adaptor = eventFactory->createAdaptor(entity);
111 qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); 198 qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString();
112 }); 199 });
113 pipeline->setPreprocessors<Akonadi2::Domain::Event>(Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); 200 //event is the entitytype and not the domain type
201 pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer);
202 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
114} 203}
115 204
116void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) 205void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback)
@@ -139,6 +228,7 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri
139 228
140Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) 229Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline)
141{ 230{
231 qDebug() << "synchronizeWithSource";
142 return Async::start<void>([this, pipeline](Async::Future<void> &f) { 232 return Async::start<void>([this, pipeline](Async::Future<void> &f) {
143 //TODO use a read-only transaction during the complete sync to sync against a defined revision 233 //TODO use a read-only transaction during the complete sync to sync against a defined revision
144 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); 234 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy");
@@ -171,7 +261,9 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
171 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); 261 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
172 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. 262 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
173 const auto key = QUuid::createUuid().toString().toUtf8(); 263 const auto key = QUuid::createUuid().toString().toUtf8();
174 pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); 264 //TODO Create queuedcommand and push into synchronizerQueue
265 //* create message in store directly, add command to messagequeue waiting for processing.
266 // pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize());
175 } else { //modification 267 } else { //modification
176 //TODO diff and create modification if necessary 268 //TODO diff and create modification if necessary
177 } 269 }
@@ -183,16 +275,18 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
183 275
184void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) 276void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline)
185{ 277{
186 Q_UNUSED(commandId) 278 qDebug() << "processCommand";
187 Q_UNUSED(data) 279 //TODO instead of copying the command including the full entity first into the command queue, we could directly
188 Q_UNUSED(size) 280 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay).
189 //TODO reallly process the commands :) 281 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire).
190 auto builder = DummyCalendar::DummyEventBuilder(m_fbb);
191 builder .add_summary(m_fbb.CreateString("summary summary!"));
192 auto buffer = builder.Finish();
193 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
194 pipeline->newEntity<Akonadi2::Domain::Event>("fakekey", m_fbb.GetBufferPointer(), m_fbb.GetSize());
195 m_fbb.Clear(); 282 m_fbb.Clear();
283 auto commandData = m_fbb.CreateVector(reinterpret_cast<uint8_t const *>(data.data()), data.size());
284 auto builder = Akonadi2::QueuedCommandBuilder(m_fbb);
285 builder.add_commandId(commandId);
286 builder.add_command(commandData);
287 auto buffer = builder.Finish();
288 Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer);
289 mUserQueue.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
196} 290}
197 291
198DummyResourceFactory::DummyResourceFactory(QObject *parent) 292DummyResourceFactory::DummyResourceFactory(QObject *parent)
@@ -211,3 +305,4 @@ void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory)
211 factory.registerFacade<Akonadi2::Domain::Event, DummyResourceFacade>(PLUGIN_NAME); 305 factory.registerFacade<Akonadi2::Domain::Event, DummyResourceFacade>(PLUGIN_NAME);
212} 306}
213 307
308#include "resourcefactory.moc"
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h
index 427fcc6..682f25c 100644
--- a/dummyresource/resourcefactory.h
+++ b/dummyresource/resourcefactory.h
@@ -21,12 +21,15 @@
21 21
22#include "common/resource.h" 22#include "common/resource.h"
23#include "async/src/async.h" 23#include "async/src/async.h"
24#include "common/messagequeue.h"
24 25
25#include <flatbuffers/flatbuffers.h> 26#include <flatbuffers/flatbuffers.h>
26 27
27//TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA 28//TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA
28#define PLUGIN_NAME "org.kde.dummy" 29#define PLUGIN_NAME "org.kde.dummy"
29 30
31class Processor;
32
30class DummyResource : public Akonadi2::Resource 33class DummyResource : public Akonadi2::Resource
31{ 34{
32public: 35public:
@@ -37,6 +40,9 @@ public:
37 40
38private: 41private:
39 flatbuffers::FlatBufferBuilder m_fbb; 42 flatbuffers::FlatBufferBuilder m_fbb;
43 MessageQueue mUserQueue;
44 MessageQueue mSynchronizerQueue;
45 Processor *mProcessor;
40}; 46};
41 47
42class DummyResourceFactory : public Akonadi2::ResourceFactory 48class DummyResourceFactory : public Akonadi2::ResourceFactory
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp
index b39b2b1..ddb59a5 100644
--- a/tests/dummyresourcetest.cpp
+++ b/tests/dummyresourcetest.cpp
@@ -2,8 +2,21 @@
2 2
3#include <QString> 3#include <QString>
4 4
5// #include "dummycalendar_generated.h"
6#include "event_generated.h"
7#include "entity_generated.h"
8#include "metadata_generated.h"
9#include "createentity_generated.h"
5#include "dummyresource/resourcefactory.h" 10#include "dummyresource/resourcefactory.h"
6#include "clientapi.h" 11#include "clientapi.h"
12#include "commands.h"
13#include "entitybuffer.h"
14
15static void removeFromDisk(const QString &name)
16{
17 Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy", Akonadi2::Storage::ReadWrite);
18 store.removeFromDisk();
19}
7 20
8class DummyResourceTest : public QObject 21class DummyResourceTest : public QObject
9{ 22{
@@ -13,34 +26,70 @@ private Q_SLOTS:
13 { 26 {
14 auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy"); 27 auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy");
15 QVERIFY(factory); 28 QVERIFY(factory);
16 Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy", Akonadi2::Storage::ReadWrite); 29 removeFromDisk("org.kde.dummy");
17 store.removeFromDisk(); 30 removeFromDisk("org.kde.dummy.userqueue");
31 removeFromDisk("org.kde.dummy.synchronizerqueue");
18 } 32 }
19 33
20 void cleanupTestCase() 34 void cleanupTestCase()
21 { 35 {
22 } 36 }
23 37
24 void testResource() 38 void testProcessCommand()
25 { 39 {
40 flatbuffers::FlatBufferBuilder eventFbb;
41 eventFbb.Clear();
42 {
43 auto summary = eventFbb.CreateString("summary");
44 Akonadi2::Domain::Buffer::EventBuilder eventBuilder(eventFbb);
45 eventBuilder.add_summary(summary);
46 auto eventLocation = eventBuilder.Finish();
47 Akonadi2::Domain::Buffer::FinishEventBuffer(eventFbb, eventLocation);
48 }
49
50 flatbuffers::FlatBufferBuilder entityFbb;
51 Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), 0, 0);
52
53 flatbuffers::FlatBufferBuilder fbb;
54 auto type = fbb.CreateString(Akonadi2::Domain::getTypeName<Akonadi2::Domain::Event>().toStdString().data());
55 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
56 Akonadi2::Commands::CreateEntityBuilder builder(fbb);
57 builder.add_domainType(type);
58 builder.add_delta(delta);
59 auto location = builder.Finish();
60 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
61
62 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
63
26 Akonadi2::Pipeline pipeline("org.kde.dummy"); 64 Akonadi2::Pipeline pipeline("org.kde.dummy");
27 DummyResource resource; 65 DummyResource resource;
28 auto job = resource.synchronizeWithSource(&pipeline); 66 resource.configurePipeline(&pipeline);
29 auto future = job.exec(); 67 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline);
30 QTRY_VERIFY(future.isFinished()); 68 //TODO wait until the pipeline has processed the command
69 QTest::qWait(1000);
31 } 70 }
32 71
33 void testSyncAndFacade() 72 // void testResourceSync()
34 { 73 // {
35 Akonadi2::Query query; 74 // Akonadi2::Pipeline pipeline("org.kde.dummy");
36 query.resources << "org.kde.dummy"; 75 // DummyResource resource;
37 76 // auto job = resource.synchronizeWithSource(&pipeline);
38 async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); 77 // auto future = job.exec();
39 result.exec(); 78 // QTRY_VERIFY(future.isFinished());
40 QVERIFY(!result.isEmpty()); 79 // }
41 auto value = result.first(); 80
42 QVERIFY(!value->getProperty("summary").toString().isEmpty()); 81 // void testSyncAndFacade()
43 } 82 // {
83 // Akonadi2::Query query;
84 // query.resources << "org.kde.dummy";
85
86 // async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query));
87 // result.exec();
88 // QVERIFY(!result.isEmpty());
89 // auto value = result.first();
90 // QVERIFY(!value->getProperty("summary").toString().isEmpty());
91 // qDebug() << value->getProperty("summary").toString();
92 // }
44 93
45}; 94};
46 95