From 47b4442c585a25b2e4b857f2d9e3ab371d942c19 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 18 Jan 2015 10:51:34 +0100 Subject: Use jobs to track progress of write commands. --- common/clientapi.h | 23 +++++++++++++--- common/resourceaccess.cpp | 63 ++++++++++++++++++++++++++++--------------- common/resourceaccess.h | 5 ++-- common/test/clientapitest.cpp | 6 ++--- dummyresource/facade.cpp | 14 ++++++---- dummyresource/facade.h | 6 ++--- synchronizer/listener.cpp | 5 +++- 7 files changed, 82 insertions(+), 40 deletions(-) diff --git a/common/clientapi.h b/common/clientapi.h index d2b1c9c..2f1c127 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -30,6 +30,7 @@ #include #include #include "threadboundary.h" +#include "async/src/async.h" namespace async { //This should abstract if we execute from eventloop or in thread. @@ -185,6 +186,11 @@ public: class MemoryBufferAdaptor : public BufferAdaptor { public: + MemoryBufferAdaptor() + : BufferAdaptor() + { + } + MemoryBufferAdaptor(const BufferAdaptor &buffer) : BufferAdaptor() { @@ -208,6 +214,11 @@ private: */ class AkonadiDomainType { public: + AkonadiDomainType() + :mAdaptor(new MemoryBufferAdaptor()) + { + + } AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision, const QSharedPointer &adaptor) : mAdaptor(adaptor), mResourceName(resourceName), @@ -310,9 +321,9 @@ class StoreFacade { public: virtual ~StoreFacade(){}; QString type() const { return Domain::getTypeName(); } - virtual void create(const DomainType &domainObject) = 0; - virtual void modify(const DomainType &domainObject) = 0; - virtual void remove(const DomainType &domainObject) = 0; + virtual Async::Job create(const DomainType &domainObject) = 0; + virtual Async::Job modify(const DomainType &domainObject) = 0; + virtual Async::Job remove(const DomainType &domainObject) = 0; virtual void load(const Query &query, const std::function &resultCallback, const std::function &completeCallback) = 0; }; @@ -440,11 +451,15 @@ public: /** * Create a new entity. */ + //TODO return job that tracks progress until resource has stored the message in it's queue? template static void create(const DomainType &domainObject, const QString &resourceIdentifier) { //Potentially move to separate thread as well auto facade = FacadeFactory::instance().getFacade(resourceIdentifier); - facade.create(domainObject); + auto job = facade->create(domainObject); + auto future = job.exec(); + future.waitForFinished(); + //TODO return job? } /** diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 9fb0d4c..7b13101 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -133,41 +133,62 @@ void ResourceAccess::registerCallback(uint messageId, const std::functionresultHandler.insert(messageId, callback); } -void ResourceAccess::sendCommand(int commandId, const std::function &callback) +Async::Job ResourceAccess::sendCommand(int commandId) { - if (isReady()) { - log(QString("Sending command %1").arg(commandId)); - d->messageId++; + return Async::start([this, commandId](Async::Future &f) { + if (isReady()) { + log(QString("Sending command %1").arg(commandId)); + d->messageId++; + registerCallback(d->messageId, [&f]() { f.setFinished(); }); + Commands::write(d->socket, d->messageId, commandId); + } else { + d->commandQueue << new QueuedCommand(commandId, [&f]() { f.setFinished(); }); + } + }); +} + +struct JobFinisher { + bool finished; + std::function callback; + + JobFinisher() : finished(false) {} + + void setFinished() { + finished = true; if (callback) { - registerCallback(d->messageId, callback); + callback(); } - Commands::write(d->socket, d->messageId, commandId); - } else { - d->commandQueue << new QueuedCommand(commandId, callback); } -} +}; -void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback) +Async::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) { + auto finisher = QSharedPointer::create(); + auto callback = [finisher] () { + finisher->setFinished(); + }; if (isReady()) { - log(QString("Sending command %1").arg(commandId)); d->messageId++; - if (callback) { - registerCallback(d->messageId, callback); - } + log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); + registerCallback(d->messageId, callback); Commands::write(d->socket, d->messageId, commandId, fbb); } else { d->commandQueue << new QueuedCommand(commandId, fbb, callback); } + return Async::start([this, finisher](Async::Future &f) { + if (finisher->finished) { + f.setFinished(); + } else { + finisher->callback = [&f]() { + f.setFinished(); + }; + } + }); } Async::Job ResourceAccess::synchronizeResource() { - return Async::start([this](Async::Future &f) { - sendCommand(Commands::SynchronizeCommand, [&f]() { - f.setFinished(); - }); - }); + return sendCommand(Commands::SynchronizeCommand); } void ResourceAccess::open() @@ -214,7 +235,7 @@ void ResourceAccess::connected() log(QString("We have %1 queued commands").arg(d->commandQueue.size())); for (QueuedCommand *command: d->commandQueue) { d->messageId++; - log(QString("Sending command %1").arg(command->commandId)); + log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId)); if (command->callback) { registerCallback(d->messageId, command->callback); } @@ -294,7 +315,7 @@ bool ResourceAccess::processMessageBuffer() } case Commands::CommandCompletion: { auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); - log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); + log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 0a333f6..a9e8c47 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -41,9 +41,8 @@ public: QString resourceName() const; bool isReady() const; - //TODO use jobs - void sendCommand(int commandId, const std::function &callback = std::function()); - void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback = std::function()); + Async::Job sendCommand(int commandId); + Async::Job sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); Async::Job synchronizeResource(); public Q_SLOTS: diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp index 2ba0dcf..16616a3 100644 --- a/common/test/clientapitest.cpp +++ b/common/test/clientapitest.cpp @@ -8,9 +8,9 @@ class DummyResourceFacade : public Akonadi2::StoreFacade create(const Akonadi2::Domain::Event &domainObject){ return Async::null(); }; + virtual Async::Job modify(const Akonadi2::Domain::Event &domainObject){ return Async::null(); }; + virtual Async::Job remove(const Akonadi2::Domain::Event &domainObject){ return Async::null(); }; virtual void load(const Akonadi2::Query &query, const std::function &resultCallback, const std::function &completeCallback) { qDebug() << "load called"; diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index 668fbbf..13c174b 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp @@ -46,7 +46,7 @@ DummyResourceFacade::~DummyResourceFacade() { } -void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject) +Async::Job DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject) { //Create message buffer and send to resource flatbuffers::FlatBufferBuilder eventFbb; @@ -64,24 +64,28 @@ void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject) Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), 0, 0); flatbuffers::FlatBufferBuilder fbb; - auto type = fbb.CreateString(Akonadi2::Domain::getTypeName().toStdString().data()); + //This is the resource type and not the domain type + auto type = fbb.CreateString("event"); auto delta = fbb.CreateVector(entityFbb.GetBufferPointer(), entityFbb.GetSize()); Akonadi2::Commands::CreateEntityBuilder builder(fbb); builder.add_domainType(type); builder.add_delta(delta); auto location = builder.Finish(); Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); - mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); + mResourceAccess->open(); + return mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); } -void DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject) +Async::Job DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject) { //Create message buffer and send to resource + return Async::null(); } -void DummyResourceFacade::remove(const Akonadi2::Domain::Event &domainObject) +Async::Job DummyResourceFacade::remove(const Akonadi2::Domain::Event &domainObject) { //Create message buffer and send to resource + return Async::null(); } static std::function prepareQuery(const Akonadi2::Query &query) diff --git a/dummyresource/facade.h b/dummyresource/facade.h index e01d254..9c8827a 100644 --- a/dummyresource/facade.h +++ b/dummyresource/facade.h @@ -37,9 +37,9 @@ class DummyResourceFacade : public Akonadi2::StoreFacade create(const Akonadi2::Domain::Event &domainObject); + virtual Async::Job modify(const Akonadi2::Domain::Event &domainObject); + virtual Async::Job remove(const Akonadi2::Domain::Event &domainObject); virtual void load(const Akonadi2::Query &query, const std::function &resultCallback, const std::function &completeCallback); private: diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index dc0d9dd..a84623d 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp @@ -209,7 +209,10 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin case Akonadi2::Commands::ModifyEntityCommand: case Akonadi2::Commands::CreateEntityCommand: log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); - m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); + loadResource(); + if (m_resource) { + m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); + } break; default: if (commandId > Akonadi2::Commands::CustomCommand) { -- cgit v1.2.3