/* * Copyright (C) 2014 Aaron Seigo * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "pipeline.h" #include #include #include #include #include "entitybuffer_generated.h" #include "metadata_generated.h" namespace Akonadi2 { class Pipeline::Private { public: Private(const QString &resourceName) : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName, Storage::ReadWrite), stepScheduled(false) { } Storage storage; QVector nullPipeline; QVector newPipeline; QVector modifiedPipeline; QVector deletedPipeline; QVector activePipelines; bool stepScheduled; }; Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(parent), d(new Private(resourceName)) { } Pipeline::~Pipeline() { delete d; } Storage &Pipeline::storage() const { return d->storage; } void Pipeline::null() { //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); d->activePipelines << state; state.step(); } void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t size) { const qint64 newRevision = storage().maxRevision() + 1; flatbuffers::FlatBufferBuilder fbb; auto builder = Akonadi2::EntityBufferBuilder(fbb); //Add metadata buffer { flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); auto metadataBuffer = metadataBuilder.Finish(); Akonadi2::FinishMetadataBuffer(fbb, metadataBuffer); //TODO use memcpy auto metadata = fbb.CreateVector(metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); builder.add_metadata(metadata); } //Add resource buffer { //TODO use memcpy auto resource = fbb.CreateVector(static_cast(resourceBufferData), size); builder.add_resource(resource); } //We don't have a local buffer yet // builder.add_local(); auto buffer = builder.Finish(); Akonadi2::FinishEntityBufferBuffer(fbb, buffer); qDebug() << "writing new entity" << key; storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); storage().setMaxRevision(newRevision); PipelineState state(this, NewPipeline, key, d->newPipeline); d->activePipelines << state; state.step(); } void Pipeline::modifiedEntity(const QByteArray &key, void *data, size_t size) { PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline); d->activePipelines << state; state.step(); } void Pipeline::deletedEntity(const QByteArray &key) { PipelineState state(this, DeletedPipeline, key, d->deletedPipeline); d->activePipelines << state; state.step(); } void Pipeline::pipelineStepped(const PipelineState &state) { scheduleStep(); } void Pipeline::scheduleStep() { if (!d->stepScheduled) { d->stepScheduled = true; QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); } } void Pipeline::stepPipelines() { for (PipelineState &state: d->activePipelines) { if (state.isIdle()) { state.step(); } } d->stepScheduled = false; } void Pipeline::pipelineCompleted(const PipelineState &state) { //TODO finalize the datastore, inform clients of the new rev const int index = d->activePipelines.indexOf(state); if (index > -1) { d->activePipelines.remove(index); } if (state.type() != NullPipeline) { emit revisionUpdated(); } scheduleStep(); } class PipelineState::Private : public QSharedData { public: Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector filters) : pipeline(p), type(t), key(k), filterIt(filters), idle(true) {} Private() : pipeline(0), filterIt(QVector()), idle(true) {} Pipeline *pipeline; Pipeline::Type type; QByteArray key; QVectorIterator filterIt; bool idle; }; PipelineState::PipelineState() : d(new Private()) { } PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters) : d(new Private(pipeline, type, key, filters)) { } PipelineState::PipelineState(const PipelineState &other) : d(other.d) { } PipelineState::~PipelineState() { } PipelineState &PipelineState::operator=(const PipelineState &rhs) { d = rhs.d; return *this; } bool PipelineState::operator==(const PipelineState &rhs) { return d == rhs.d; } bool PipelineState::isIdle() const { return d->idle; } QByteArray PipelineState::key() const { return d->key; } Pipeline::Type PipelineState::type() const { return d->type; } void PipelineState::step() { if (!d->pipeline) { return; } d->idle = false; if (d->filterIt.hasNext()) { d->filterIt.next()->process(*this); } else { d->pipeline->pipelineCompleted(*this); } } void PipelineState::processingCompleted(Preprocessor *filter) { if (d->pipeline && filter == d->filterIt.peekPrevious()) { d->idle = true; d->pipeline->pipelineStepped(*this); } } Preprocessor::Preprocessor() : d(0) { } Preprocessor::~Preprocessor() { } void Preprocessor::process(PipelineState state) { processingCompleted(state); } void Preprocessor::processingCompleted(PipelineState state) { state.processingCompleted(this); } } // namespace Akonadi2