From bc2a95cad05e454a84c317f1078edb329bd3afd4 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 15 Jan 2015 01:56:09 +0100 Subject: Writing from facade. --- common/CMakeLists.txt | 1 + common/commands/createentity.fbs | 2 +- common/entitybuffer.cpp | 9 +-- common/entitybuffer.h | 2 +- common/metadata.fbs | 1 + common/pipeline.cpp | 79 ++++++++++++++++-------- common/pipeline.h | 41 ++++--------- common/queuedcommand.fbs | 11 ++++ common/resourceaccess.h | 2 +- dummyresource/domainadaptor.cpp | 5 +- dummyresource/facade.cpp | 24 ++++++++ dummyresource/resourcefactory.cpp | 123 +++++++++++++++++++++++++++++++++----- dummyresource/resourcefactory.h | 6 ++ tests/dummyresourcetest.cpp | 83 +++++++++++++++++++------ 14 files changed, 293 insertions(+), 96 deletions(-) create mode 100644 common/queuedcommand.fbs diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 671d1cd..6f8fee3 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -10,6 +10,7 @@ generate_flatbuffers( domain/event entity metadata + queuedcommand ) if (STORAGE_unqlite) diff --git a/common/commands/createentity.fbs b/common/commands/createentity.fbs index 564c231..23eeff9 100644 --- a/common/commands/createentity.fbs +++ b/common/commands/createentity.fbs @@ -1,4 +1,4 @@ -namespace Akonadi2; +namespace Akonadi2.Commands; table CreateEntity { domainType: string; diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp index aa5847c..4af84ef 100644 --- a/common/entitybuffer.cpp +++ b/common/entitybuffer.cpp @@ -56,11 +56,12 @@ void EntityBuffer::extractResourceBuffer(void *dataValue, int dataSize, const st } } -void EntityBuffer::assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize) +void EntityBuffer::assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize) { - auto metadata = fbb.CreateVector(static_cast(metadataData), metadataSize); - auto resource = fbb.CreateVector(static_cast(resourceData), resourceSize); - auto local = fbb.CreateVector(static_cast(localData), localSize); + qDebug() << "res size: " << resourceSize; + auto metadata = fbb.CreateVector(static_cast(metadataData), metadataSize); + auto resource = fbb.CreateVector(static_cast(resourceData), resourceSize); + auto local = fbb.CreateVector(static_cast(localData), localSize); auto builder = Akonadi2::EntityBuilder(fbb); builder.add_metadata(metadata); builder.add_resource(resource); diff --git a/common/entitybuffer.h b/common/entitybuffer.h index 600b04d..097b450 100644 --- a/common/entitybuffer.h +++ b/common/entitybuffer.h @@ -15,7 +15,7 @@ public: const Entity &entity(); static void extractResourceBuffer(void *dataValue, int dataSize, const std::function &handler); - static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize); + static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize); private: const Entity *mEntity; diff --git a/common/metadata.fbs b/common/metadata.fbs index 34a8df2..bb1163d 100644 --- a/common/metadata.fbs +++ b/common/metadata.fbs @@ -3,6 +3,7 @@ namespace Akonadi2; table Metadata { revision: ulong; processed: bool = true; + processingProgress: [string]; } root_type Metadata; diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 10bae54..9cc7450 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -23,10 +23,13 @@ #include #include #include +#include #include #include "entity_generated.h" #include "metadata_generated.h" +#include "createentity_generated.h" #include "entitybuffer.h" +#include "async/src/async.h" namespace Akonadi2 { @@ -90,39 +93,59 @@ void Pipeline::null() // state.step(); } -void Pipeline::newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size) +Async::Job Pipeline::newEntity(void const *command, size_t size) { - const qint64 newRevision = storage().maxRevision() + 1; - - //Add metadata buffer - flatbuffers::FlatBufferBuilder metadataFbb; - auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); - metadataBuilder.add_revision(newRevision); - metadataBuilder.add_processed(false); - auto metadataBuffer = metadataBuilder.Finish(); - Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); - - flatbuffers::FlatBufferBuilder fbb; - EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), resourceBufferData, size, 0, 0); - - storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); - storage().setMaxRevision(newRevision); + qDebug() << "new entity"; + Async::start([&](Async::Future future) { + + //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. + const auto key = QUuid::createUuid().toString().toUtf8(); + + //TODO figure out if we already have created a revision for the message? + const qint64 newRevision = storage().maxRevision() + 1; + + auto createEntity = Akonadi2::Commands::GetCreateEntity(command); + //TODO rename createEntitiy->domainType to bufferType + const QString entityType = QString::fromUtf8(reinterpret_cast(createEntity->domainType()->Data()), createEntity->domainType()->size()); + auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); + // + // const QString entityType; + // auto entity = Akonadi2::GetEntity(0); + + //Add metadata buffer + flatbuffers::FlatBufferBuilder metadataFbb; + auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + metadataBuilder.add_processed(false); + auto metadataBuffer = metadataBuilder.Finish(); + Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); + //TODO we should reserve some space in metadata for in-place updates + + flatbuffers::FlatBufferBuilder fbb; + EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); + + storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); + storage().setMaxRevision(newRevision); + + PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { + future.setFinished(); + }); + d->activePipelines << state; + state.step(); - PipelineState state(this, NewPipeline, key, d->newPipeline[entityType]); - d->activePipelines << state; - state.step(); + }); } void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) { - PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType]); + PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [](){}); d->activePipelines << state; state.step(); } void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) { - PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType]); + PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [](){}); d->activePipelines << state; state.step(); } @@ -160,6 +183,7 @@ void Pipeline::pipelineCompleted(const PipelineState &state) } if (state.type() != NullPipeline) { + //TODO what revision is finalized? emit revisionUpdated(); } scheduleStep(); @@ -172,12 +196,13 @@ void Pipeline::pipelineCompleted(const PipelineState &state) class PipelineState::Private : public QSharedData { public: - Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector filters) + Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector filters, const std::function &c) : pipeline(p), type(t), key(k), filterIt(filters), - idle(true) + idle(true), + callback(c) {} Private() @@ -191,6 +216,7 @@ public: QByteArray key; QVectorIterator filterIt; bool idle; + std::function callback; }; PipelineState::PipelineState() @@ -199,8 +225,8 @@ PipelineState::PipelineState() } -PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters) - : d(new Private(pipeline, type, key, filters)) +PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters, const std::function &callback) + : d(new Private(pipeline, type, key, filters, callback)) { } @@ -247,6 +273,7 @@ void PipelineState::step() d->idle = false; if (d->filterIt.hasNext()) { + //TODO skip step if already processed d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { auto entity = Akonadi2::GetEntity(dataValue); d->filterIt.next()->process(*this, *entity); @@ -254,11 +281,13 @@ void PipelineState::step() }); } else { d->pipeline->pipelineCompleted(*this); + d->callback(); } } void PipelineState::processingCompleted(Preprocessor *filter) { + //TODO record processing progress if (d->pipeline && filter == d->filterIt.peekPrevious()) { d->idle = true; d->pipeline->pipelineStepped(*this); diff --git a/common/pipeline.h b/common/pipeline.h index 6b847f5..918d21e 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -27,7 +27,7 @@ #include #include -#include //For domain types +#include "async/src/async.h" #include "entity_generated.h" @@ -49,34 +49,13 @@ public: Storage &storage() const; - // template - // Storage &storage() const; - - template - void setPreprocessors(Type type, const QVector &preprocessors) - { - setPreprocessors(Akonadi2::Domain::getTypeName(), type, preprocessors); - } + void setPreprocessors(const QString &entityType, Type pipelineType, const QVector &preprocessors); void null(); - template - void newEntity(const QByteArray &key, void *resourceBufferData, size_t size) - { - newEntity(Akonadi2::Domain::getTypeName(), key, resourceBufferData, size); - } - - template - void modifiedEntity(const QByteArray &key, void *data, size_t size) - { - modifiedEntity(Akonadi2::Domain::getTypeName(), key, data, size); - } - - template - void deletedEntity(const QByteArray &key) - { - deletedEntity(Akonadi2::Domain::getTypeName(), key); - } + Async::Job newEntity(void const *command, size_t size); + void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); + void deletedEntity(const QString &entityType, const QByteArray &key); Q_SIGNALS: void revisionUpdated(); @@ -86,10 +65,6 @@ private Q_SLOTS: void stepPipelines(); private: - void setPreprocessors(const QString &entityType, Type pipelineType, const QVector &preprocessors); - void newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size); - void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); - void deletedEntity(const QString &entityType, const QByteArray &key); void pipelineStepped(const PipelineState &state); void pipelineCompleted(const PipelineState &state); void scheduleStep(); @@ -104,7 +79,7 @@ class AKONADI2COMMON_EXPORT PipelineState { public: PipelineState(); - PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters); + PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters, const std::function &callback); PipelineState(const PipelineState &other); ~PipelineState(); @@ -114,6 +89,7 @@ public: bool isIdle() const; QByteArray key() const; Pipeline::Type type() const; + //TODO expose command void step(); void processingCompleted(Preprocessor *filter); @@ -129,7 +105,10 @@ public: Preprocessor(); virtual ~Preprocessor(); + //TODO pass actual command as well, for changerecording virtual void process(PipelineState state, const Akonadi2::Entity &); + //TODO to record progress + // virtual QString id(); protected: void processingCompleted(PipelineState state); diff --git a/common/queuedcommand.fbs b/common/queuedcommand.fbs new file mode 100644 index 0000000..0ca899f --- /dev/null +++ b/common/queuedcommand.fbs @@ -0,0 +1,11 @@ +namespace Akonadi2; + +table QueuedCommand { + commandId: int; + command: [ubyte]; + // entityId: string; + // sourceRevision: ulong; + // targetRevision: [ubyte]; +} + +root_type QueuedCommand; diff --git a/common/resourceaccess.h b/common/resourceaccess.h index d45ebde..0a333f6 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -43,7 +43,7 @@ public: //TODO use jobs void sendCommand(int commandId, const std::function &callback = std::function()); - void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback); + void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback = std::function()); Async::Job synchronizeResource(); public Q_SLOTS: diff --git a/dummyresource/domainadaptor.cpp b/dummyresource/domainadaptor.cpp index ae001cf..9bd3770 100644 --- a/dummyresource/domainadaptor.cpp +++ b/dummyresource/domainadaptor.cpp @@ -29,8 +29,8 @@ public: void setProperty(const QString &key, const QVariant &value) { - if (mResourceMapper->mWriteAccessors.contains(key)) { - // mResourceMapper.setProperty(key, value, mResourceBuffer); + if (mResourceMapper && mResourceMapper->mWriteAccessors.contains(key)) { + // mResourceMapper->setProperty(key, value, mResourceBuffer); } else { // mLocalMapper.; } @@ -69,6 +69,7 @@ DummyEventAdaptorFactory::DummyEventAdaptorFactory() mResourceMapper->mReadAccessors.insert("summary", [](DummyEvent const *buffer) -> QVariant { return QString::fromStdString(buffer->summary()->c_str()); }); + mLocalMapper = QSharedPointer >::create(); //TODO set accessors for all properties } diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index f754c7e..668fbbf 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp @@ -28,6 +28,7 @@ #include "event_generated.h" #include "entity_generated.h" #include "metadata_generated.h" +#include "createentity_generated.h" #include "domainadaptor.h" #include @@ -48,6 +49,29 @@ DummyResourceFacade::~DummyResourceFacade() void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject) { //Create message buffer and send to resource + flatbuffers::FlatBufferBuilder eventFbb; + eventFbb.Clear(); + { + auto summary = eventFbb.CreateString("summary"); + // auto data = fbb.CreateUninitializedVector(attachmentSize); + DummyCalendar::DummyEventBuilder eventBuilder(eventFbb); + eventBuilder.add_summary(summary); + auto eventLocation = eventBuilder.Finish(); + DummyCalendar::FinishDummyEventBuffer(eventFbb, eventLocation); + // memcpy((void*)DummyCalendar::GetDummyEvent(fbb.GetBufferPointer())->attachment()->Data(), rawData, attachmentSize); + } + flatbuffers::FlatBufferBuilder entityFbb; + Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), 0, 0); + + flatbuffers::FlatBufferBuilder fbb; + auto type = fbb.CreateString(Akonadi2::Domain::getTypeName().toStdString().data()); + auto delta = fbb.CreateVector(entityFbb.GetBufferPointer(), entityFbb.GetSize()); + Akonadi2::Commands::CreateEntityBuilder builder(fbb); + builder.add_domainType(type); + builder.add_delta(delta); + auto location = builder.Finish(); + Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); + mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); } void DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject) diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index e4f7e41..e14aa01 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -23,18 +23,20 @@ #include "pipeline.h" #include "dummycalendar_generated.h" #include "metadata_generated.h" +#include "queuedcommand_generated.h" #include "domainadaptor.h" +#include "commands.h" +#include "clientapi.h" #include /* * Figure out how to implement various classes of processors: - * * read-only (index and such) => domain adapter + * * read-only (index and such) => extractor function, probably using domain adaptor * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) * * flag extractors? => like read-only? Or write to local portion of buffer? * ** $ISSPAM should become part of domain object and is written to the local part of the mail. * ** => value could be calculated by the server directly */ -// template class SimpleProcessor : public Akonadi2::Preprocessor { public: @@ -96,21 +98,108 @@ QMap populate() static QMap s_dataSource = populate(); +//Drives the pipeline using the output from all command queues +class Processor : public QObject +{ + Q_OBJECT +public: + Processor(Akonadi2::Pipeline *pipeline, QList commandQueues) + : QObject(), + mPipeline(pipeline), + mCommandQueues(commandQueues), + mProcessingLock(false) + { + for (auto queue : mCommandQueues) { + bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); + Q_ASSERT(ret); + } + } + +private slots: + void process() + { + if (mProcessingLock) { + return; + } + mProcessingLock = true; + //Empty queue after queue + //FIXME the for and while loops should be async, otherwise we process all messages in parallel + for (auto queue : mCommandQueues) { + qDebug() << "processing queue"; + bool processedMessage = false; + while (processedMessage) { + qDebug() << "process"; + processedMessage = false; + queue->dequeue([this, &processedMessage](void *ptr, int size, std::function messageQueueCallback) { + flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); + if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { + qWarning() << "invalid buffer"; + processedMessage = false; + messageQueueCallback(false); + return; + } + auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); + qDebug() << "Dequeued: " << queuedCommand->commandId(); + //Throw command into appropriate pipeline + switch (queuedCommand->commandId()) { + case Akonadi2::Commands::DeleteEntityCommand: + //mPipeline->removedEntity + break; + case Akonadi2::Commands::ModifyEntityCommand: + //mPipeline->modifiedEntity + break; + case Akonadi2::Commands::CreateEntityCommand: { + //TODO job lifetime management + auto job = mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([&messageQueueCallback](Async::Future future) { + messageQueueCallback(true); + }); + job.exec(); + } + break; + default: + //Unhandled command + qWarning() << "Unhandled command"; + messageQueueCallback(true); + break; + } + processedMessage = true; + }, + [&processedMessage](const MessageQueue::Error &error) { + processedMessage = false; + }); + } + } + mProcessingLock = false; + } + +private: + Akonadi2::Pipeline *mPipeline; + //Ordered by priority + QList mCommandQueues; + bool mProcessingLock; +}; + DummyResource::DummyResource() - : Akonadi2::Resource() + : Akonadi2::Resource(), + mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), + mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue") { } void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) { auto eventFactory = QSharedPointer::create(); + //FIXME we should setup for each resource entity type, not for each domain type + //i.e. If a resource stores tags as part of each message it needs to update the tag index //TODO setup preprocessors for each domain type and pipeline type allowing full customization //Eventually the order should be self configuring, for now it's hardcoded. auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { auto adaptor = eventFactory->createAdaptor(entity); qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); }); - pipeline->setPreprocessors(Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); + //event is the entitytype and not the domain type + pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); + mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); } void findByRemoteId(QSharedPointer storage, const QString &rid, std::function callback) @@ -139,6 +228,7 @@ void findByRemoteId(QSharedPointer storage, const QString &ri Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) { + qDebug() << "synchronizeWithSource"; return Async::start([this, pipeline](Async::Future &f) { //TODO use a read-only transaction during the complete sync to sync against a defined revision auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); @@ -171,7 +261,9 @@ Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. const auto key = QUuid::createUuid().toString().toUtf8(); - pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); + //TODO Create queuedcommand and push into synchronizerQueue + //* create message in store directly, add command to messagequeue waiting for processing. + // pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); } else { //modification //TODO diff and create modification if necessary } @@ -183,16 +275,18 @@ Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) { - Q_UNUSED(commandId) - Q_UNUSED(data) - Q_UNUSED(size) - //TODO reallly process the commands :) - auto builder = DummyCalendar::DummyEventBuilder(m_fbb); - builder .add_summary(m_fbb.CreateString("summary summary!")); - auto buffer = builder.Finish(); - DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); - pipeline->newEntity("fakekey", m_fbb.GetBufferPointer(), m_fbb.GetSize()); + qDebug() << "processCommand"; + //TODO instead of copying the command including the full entity first into the command queue, we could directly + //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). + //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). m_fbb.Clear(); + auto commandData = m_fbb.CreateVector(reinterpret_cast(data.data()), data.size()); + auto builder = Akonadi2::QueuedCommandBuilder(m_fbb); + builder.add_commandId(commandId); + builder.add_command(commandData); + auto buffer = builder.Finish(); + Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); + mUserQueue.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); } DummyResourceFactory::DummyResourceFactory(QObject *parent) @@ -211,3 +305,4 @@ void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory) factory.registerFacade(PLUGIN_NAME); } +#include "resourcefactory.moc" diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 427fcc6..682f25c 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h @@ -21,12 +21,15 @@ #include "common/resource.h" #include "async/src/async.h" +#include "common/messagequeue.h" #include //TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA #define PLUGIN_NAME "org.kde.dummy" +class Processor; + class DummyResource : public Akonadi2::Resource { public: @@ -37,6 +40,9 @@ public: private: flatbuffers::FlatBufferBuilder m_fbb; + MessageQueue mUserQueue; + MessageQueue mSynchronizerQueue; + Processor *mProcessor; }; class DummyResourceFactory : public Akonadi2::ResourceFactory diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index b39b2b1..ddb59a5 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp @@ -2,8 +2,21 @@ #include +// #include "dummycalendar_generated.h" +#include "event_generated.h" +#include "entity_generated.h" +#include "metadata_generated.h" +#include "createentity_generated.h" #include "dummyresource/resourcefactory.h" #include "clientapi.h" +#include "commands.h" +#include "entitybuffer.h" + +static void removeFromDisk(const QString &name) +{ + Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy", Akonadi2::Storage::ReadWrite); + store.removeFromDisk(); +} class DummyResourceTest : public QObject { @@ -13,34 +26,70 @@ private Q_SLOTS: { auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy"); QVERIFY(factory); - Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy", Akonadi2::Storage::ReadWrite); - store.removeFromDisk(); + removeFromDisk("org.kde.dummy"); + removeFromDisk("org.kde.dummy.userqueue"); + removeFromDisk("org.kde.dummy.synchronizerqueue"); } void cleanupTestCase() { } - void testResource() + void testProcessCommand() { + flatbuffers::FlatBufferBuilder eventFbb; + eventFbb.Clear(); + { + auto summary = eventFbb.CreateString("summary"); + Akonadi2::Domain::Buffer::EventBuilder eventBuilder(eventFbb); + eventBuilder.add_summary(summary); + auto eventLocation = eventBuilder.Finish(); + Akonadi2::Domain::Buffer::FinishEventBuffer(eventFbb, eventLocation); + } + + flatbuffers::FlatBufferBuilder entityFbb; + Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), 0, 0); + + flatbuffers::FlatBufferBuilder fbb; + auto type = fbb.CreateString(Akonadi2::Domain::getTypeName().toStdString().data()); + auto delta = fbb.CreateVector(entityFbb.GetBufferPointer(), entityFbb.GetSize()); + Akonadi2::Commands::CreateEntityBuilder builder(fbb); + builder.add_domainType(type); + builder.add_delta(delta); + auto location = builder.Finish(); + Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); + + const QByteArray command(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); + Akonadi2::Pipeline pipeline("org.kde.dummy"); DummyResource resource; - auto job = resource.synchronizeWithSource(&pipeline); - auto future = job.exec(); - QTRY_VERIFY(future.isFinished()); + resource.configurePipeline(&pipeline); + resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); + //TODO wait until the pipeline has processed the command + QTest::qWait(1000); } - void testSyncAndFacade() - { - Akonadi2::Query query; - query.resources << "org.kde.dummy"; - - async::SyncListResult result(Akonadi2::Store::load(query)); - result.exec(); - QVERIFY(!result.isEmpty()); - auto value = result.first(); - QVERIFY(!value->getProperty("summary").toString().isEmpty()); - } + // void testResourceSync() + // { + // Akonadi2::Pipeline pipeline("org.kde.dummy"); + // DummyResource resource; + // auto job = resource.synchronizeWithSource(&pipeline); + // auto future = job.exec(); + // QTRY_VERIFY(future.isFinished()); + // } + + // void testSyncAndFacade() + // { + // Akonadi2::Query query; + // query.resources << "org.kde.dummy"; + + // async::SyncListResult result(Akonadi2::Store::load(query)); + // result.exec(); + // QVERIFY(!result.isEmpty()); + // auto value = result.first(); + // QVERIFY(!value->getProperty("summary").toString().isEmpty()); + // qDebug() << value->getProperty("summary").toString(); + // } }; -- cgit v1.2.3