From 47b4442c585a25b2e4b857f2d9e3ab371d942c19 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 18 Jan 2015 10:51:34 +0100 Subject: Use jobs to track progress of write commands. --- common/clientapi.h | 23 +++++++++++++--- common/resourceaccess.cpp | 63 ++++++++++++++++++++++++++++--------------- common/resourceaccess.h | 5 ++-- common/test/clientapitest.cpp | 6 ++--- 4 files changed, 66 insertions(+), 31 deletions(-) (limited to 'common') diff --git a/common/clientapi.h b/common/clientapi.h index d2b1c9c..2f1c127 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -30,6 +30,7 @@ #include #include #include "threadboundary.h" +#include "async/src/async.h" namespace async { //This should abstract if we execute from eventloop or in thread. @@ -185,6 +186,11 @@ public: class MemoryBufferAdaptor : public BufferAdaptor { public: + MemoryBufferAdaptor() + : BufferAdaptor() + { + } + MemoryBufferAdaptor(const BufferAdaptor &buffer) : BufferAdaptor() { @@ -208,6 +214,11 @@ private: */ class AkonadiDomainType { public: + AkonadiDomainType() + :mAdaptor(new MemoryBufferAdaptor()) + { + + } AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision, const QSharedPointer &adaptor) : mAdaptor(adaptor), mResourceName(resourceName), @@ -310,9 +321,9 @@ class StoreFacade { public: virtual ~StoreFacade(){}; QString type() const { return Domain::getTypeName(); } - virtual void create(const DomainType &domainObject) = 0; - virtual void modify(const DomainType &domainObject) = 0; - virtual void remove(const DomainType &domainObject) = 0; + virtual Async::Job create(const DomainType &domainObject) = 0; + virtual Async::Job modify(const DomainType &domainObject) = 0; + virtual Async::Job remove(const DomainType &domainObject) = 0; virtual void load(const Query &query, const std::function &resultCallback, const std::function &completeCallback) = 0; }; @@ -440,11 +451,15 @@ public: /** * Create a new entity. */ + //TODO return job that tracks progress until resource has stored the message in it's queue? template static void create(const DomainType &domainObject, const QString &resourceIdentifier) { //Potentially move to separate thread as well auto facade = FacadeFactory::instance().getFacade(resourceIdentifier); - facade.create(domainObject); + auto job = facade->create(domainObject); + auto future = job.exec(); + future.waitForFinished(); + //TODO return job? } /** diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 9fb0d4c..7b13101 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -133,41 +133,62 @@ void ResourceAccess::registerCallback(uint messageId, const std::functionresultHandler.insert(messageId, callback); } -void ResourceAccess::sendCommand(int commandId, const std::function &callback) +Async::Job ResourceAccess::sendCommand(int commandId) { - if (isReady()) { - log(QString("Sending command %1").arg(commandId)); - d->messageId++; + return Async::start([this, commandId](Async::Future &f) { + if (isReady()) { + log(QString("Sending command %1").arg(commandId)); + d->messageId++; + registerCallback(d->messageId, [&f]() { f.setFinished(); }); + Commands::write(d->socket, d->messageId, commandId); + } else { + d->commandQueue << new QueuedCommand(commandId, [&f]() { f.setFinished(); }); + } + }); +} + +struct JobFinisher { + bool finished; + std::function callback; + + JobFinisher() : finished(false) {} + + void setFinished() { + finished = true; if (callback) { - registerCallback(d->messageId, callback); + callback(); } - Commands::write(d->socket, d->messageId, commandId); - } else { - d->commandQueue << new QueuedCommand(commandId, callback); } -} +}; -void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback) +Async::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) { + auto finisher = QSharedPointer::create(); + auto callback = [finisher] () { + finisher->setFinished(); + }; if (isReady()) { - log(QString("Sending command %1").arg(commandId)); d->messageId++; - if (callback) { - registerCallback(d->messageId, callback); - } + log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); + registerCallback(d->messageId, callback); Commands::write(d->socket, d->messageId, commandId, fbb); } else { d->commandQueue << new QueuedCommand(commandId, fbb, callback); } + return Async::start([this, finisher](Async::Future &f) { + if (finisher->finished) { + f.setFinished(); + } else { + finisher->callback = [&f]() { + f.setFinished(); + }; + } + }); } Async::Job ResourceAccess::synchronizeResource() { - return Async::start([this](Async::Future &f) { - sendCommand(Commands::SynchronizeCommand, [&f]() { - f.setFinished(); - }); - }); + return sendCommand(Commands::SynchronizeCommand); } void ResourceAccess::open() @@ -214,7 +235,7 @@ void ResourceAccess::connected() log(QString("We have %1 queued commands").arg(d->commandQueue.size())); for (QueuedCommand *command: d->commandQueue) { d->messageId++; - log(QString("Sending command %1").arg(command->commandId)); + log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId)); if (command->callback) { registerCallback(d->messageId, command->callback); } @@ -294,7 +315,7 @@ bool ResourceAccess::processMessageBuffer() } case Commands::CommandCompletion: { auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); - log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); + log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 0a333f6..a9e8c47 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -41,9 +41,8 @@ public: QString resourceName() const; bool isReady() const; - //TODO use jobs - void sendCommand(int commandId, const std::function &callback = std::function()); - void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback = std::function()); + Async::Job sendCommand(int commandId); + Async::Job sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); Async::Job synchronizeResource(); public Q_SLOTS: diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp index 2ba0dcf..16616a3 100644 --- a/common/test/clientapitest.cpp +++ b/common/test/clientapitest.cpp @@ -8,9 +8,9 @@ class DummyResourceFacade : public Akonadi2::StoreFacade create(const Akonadi2::Domain::Event &domainObject){ return Async::null(); }; + virtual Async::Job modify(const Akonadi2::Domain::Event &domainObject){ return Async::null(); }; + virtual Async::Job remove(const Akonadi2::Domain::Event &domainObject){ return Async::null(); }; virtual void load(const Akonadi2::Query &query, const std::function &resultCallback, const std::function &completeCallback) { qDebug() << "load called"; -- cgit v1.2.3