From b43c0cf97615957e097daef29ff8febc1ec884c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Vr=C3=A1til?= Date: Fri, 15 May 2015 16:22:03 +0200 Subject: Adapt to KAsync namespace change --- common/CMakeLists.txt | 2 +- common/clientapi.cpp | 2 +- common/clientapi.h | 12 ++++++------ common/facade.h | 32 ++++++++++++++++---------------- common/genericresource.cpp | 32 ++++++++++++++++---------------- common/genericresource.h | 4 ++-- common/pipeline.cpp | 8 ++++---- common/pipeline.h | 2 +- common/resource.cpp | 8 ++++---- common/resource.h | 4 ++-- common/resourceaccess.cpp | 30 +++++++++++++++--------------- common/resourceaccess.h | 8 ++++---- 12 files changed, 72 insertions(+), 72 deletions(-) (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 19b23c8..4fb8a67 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -50,5 +50,5 @@ generate_flatbuffers( generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) qt5_use_modules(${PROJECT_NAME} Network) -target_link_libraries(${PROJECT_NAME} ${storage_LIBS} akonadi2async) +target_link_libraries(${PROJECT_NAME} ${storage_LIBS} KF5Async) install(TARGETS ${PROJECT_NAME} ${KDE_INSTALL_TARGETS_DEFAULT_ARGS}) diff --git a/common/clientapi.cpp b/common/clientapi.cpp index e4608c8..d287fcf 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp @@ -42,7 +42,7 @@ QByteArray getTypeName() void Store::shutdown(const QByteArray &identifier) { Trace() << "shutdown"; - ResourceAccess::connectToServer(identifier).then>([identifier](const QSharedPointer &socket, Async::Future &future) { + ResourceAccess::connectToServer(identifier).then>([identifier](const QSharedPointer &socket, KAsync::Future &future) { //We can't currently reuse the socket socket->close(); auto resourceAccess = QSharedPointer::create(identifier); diff --git a/common/clientapi.h b/common/clientapi.h index 1bd8bdc..0ce1691 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -228,10 +228,10 @@ class StoreFacade { public: virtual ~StoreFacade(){}; QByteArray type() const { return ApplicationDomain::getTypeName(); } - virtual Async::Job create(const DomainType &domainObject) = 0; - virtual Async::Job modify(const DomainType &domainObject) = 0; - virtual Async::Job remove(const DomainType &domainObject) = 0; - virtual Async::Job load(const Query &query, const QSharedPointer > &resultProvider) = 0; + virtual KAsync::Job create(const DomainType &domainObject) = 0; + virtual KAsync::Job modify(const DomainType &domainObject) = 0; + virtual KAsync::Job remove(const DomainType &domainObject) = 0; + virtual KAsync::Job load(const Query &query, const QSharedPointer > &resultProvider) = 0; }; @@ -341,8 +341,8 @@ public: //The result provider must be threadsafe. async::run([query, resultSet](){ // Query all resources and aggregate results - Async::iterate(query.resources) - .template each([query, resultSet](const QByteArray &resource, Async::Future &future) { + KAsync::iterate(query.resources) + .template each([query, resultSet](const QByteArray &resource, KAsync::Future &future) { //TODO pass resource identifier to factory auto facade = FacadeFactory::instance().getFacade(resource); if (facade) { diff --git a/common/facade.h b/common/facade.h index 8c6578f..dcb30b6 100644 --- a/common/facade.h +++ b/common/facade.h @@ -44,13 +44,13 @@ class QueryRunner : public QObject { Q_OBJECT public: - typedef std::function(qint64 oldRevision, qint64 newRevision)> QueryFunction; + typedef std::function(qint64 oldRevision, qint64 newRevision)> QueryFunction; QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; /** * Starts query */ - Async::Job run(qint64 newRevision = 0) + KAsync::Job run(qint64 newRevision = 0) { //TODO: JOBAPI: that last empty .then should not be necessary return queryFunction(mLatestRevision, newRevision).then([this](qint64 revision) { @@ -120,7 +120,7 @@ public: return Akonadi2::ApplicationDomain::getTypeName(); } - Async::Job create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE + KAsync::Job create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE { if (!mDomainTypeAdaptorFactory) { Warning() << "No domain type adaptor factory available"; @@ -130,25 +130,25 @@ public: return sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); } - Async::Job modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE + KAsync::Job modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE { //TODO - return Async::null(); + return KAsync::null(); } - Async::Job remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE + KAsync::Job remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE { //TODO - return Async::null(); + return KAsync::null(); } //TODO JOBAPI return job from sync continuation to execute it as subjob? - Async::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE + KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE { auto runner = QSharedPointer::create(query); QWeakPointer > weakResultProvider = resultProvider; - runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision, qint64 newRevision) -> Async::Job { - return Async::start([this, weakResultProvider, query, oldRevision, newRevision](Async::Future &future) { + runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision, qint64 newRevision) -> KAsync::Job { + return KAsync::start([this, weakResultProvider, query, oldRevision, newRevision](KAsync::Future &future) { auto resultProvider = weakResultProvider.toStrongRef(); if (!resultProvider) { Warning() << "Tried executing query after result provider is already gone"; @@ -175,7 +175,7 @@ public: } //We have to capture the runner to keep it alive - return synchronizeResource(query.syncOnDemand, query.processAll).template then([runner](Async::Future &future) { + return synchronizeResource(query.syncOnDemand, query.processAll).template then([runner](KAsync::Future &future) { runner->run().then([&future]() { future.setFinished(); }).exec(); @@ -183,7 +183,7 @@ public: } protected: - Async::Job sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) + KAsync::Job sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) { flatbuffers::FlatBufferBuilder fbb; //This is the resource buffer type and not the domain type @@ -195,7 +195,7 @@ protected: return mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); } - Async::Job synchronizeResource(bool sync, bool processAll) + KAsync::Job synchronizeResource(bool sync, bool processAll) { //TODO check if a sync is necessary //TODO Only sync what was requested @@ -203,17 +203,17 @@ protected: //TODO the synchronization should normally not be necessary: We just return what is already available. if (sync || processAll) { - return Async::start([=](Async::Future &future) { + return KAsync::start([=](KAsync::Future &future) { mResourceAccess->open(); mResourceAccess->synchronizeResource(sync, processAll).then([&future]() { future.setFinished(); }).exec(); }); } - return Async::null(); + return KAsync::null(); } - virtual Async::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider, qint64 oldRevision, qint64 newRevision) { return Async::null(); }; + virtual KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider, qint64 oldRevision, qint64 newRevision) { return KAsync::null(); }; protected: //TODO use one resource access instance per application => make static diff --git a/common/genericresource.cpp b/common/genericresource.cpp index ea6413b..2394b80 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -61,33 +61,33 @@ private slots: }).exec(); } - Async::Job processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) + KAsync::Job processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) { Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); //Throw command into appropriate pipeline switch (queuedCommand->commandId()) { case Akonadi2::Commands::DeleteEntityCommand: //mPipeline->removedEntity - return Async::null(); + return KAsync::null(); case Akonadi2::Commands::ModifyEntityCommand: //mPipeline->modifiedEntity - return Async::null(); + return KAsync::null(); case Akonadi2::Commands::CreateEntityCommand: return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); default: - return Async::error(-1, "Unhandled command"); + return KAsync::error(-1, "Unhandled command"); } - return Async::null(); + return KAsync::null(); } //Process all messages of this queue - Async::Job processQueue(MessageQueue *queue) + KAsync::Job processQueue(MessageQueue *queue) { //TODO use something like: - //Async::foreach("pass iterator here").each("process value here").join(); - //Async::foreach("pass iterator here").parallel("process value here").join(); - return Async::dowhile( - [this, queue](Async::Future &future) { + //KAsync::foreach("pass iterator here").each("process value here").join(); + //KAsync::foreach("pass iterator here").parallel("process value here").join(); + return KAsync::dowhile( + [this, queue](KAsync::Future &future) { if (queue->isEmpty()) { future.setValue(false); future.setFinished(); @@ -133,13 +133,13 @@ private slots: ); } - Async::Job processPipeline() + KAsync::Job processPipeline() { //Go through all message queues auto it = QSharedPointer >::create(mCommandQueues); - return Async::dowhile( + return KAsync::dowhile( [it]() { return it->hasNext(); }, - [it, this](Async::Future &future) { + [it, this](KAsync::Future &future) { auto queue = it->next(); processQueue(queue).then([&future]() { Trace() << "Queue processed"; @@ -206,12 +206,12 @@ void GenericResource::processCommand(int commandId, const QByteArray &data, uint enqueueCommand(mUserQueue, commandId, data); } -Async::Job GenericResource::processAllMessages() +KAsync::Job GenericResource::processAllMessages() { //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. //TODO: report errors while processing sync? //TODO JOBAPI: A helper that waits for n events and then continues? - return Async::start([this](Async::Future &f) { + return KAsync::start([this](KAsync::Future &f) { if (mSynchronizerQueue.isEmpty()) { f.setFinished(); } else { @@ -219,7 +219,7 @@ Async::Job GenericResource::processAllMessages() f.setFinished(); }); } - }).then([this](Async::Future &f) { + }).then([this](KAsync::Future &f) { if (mUserQueue.isEmpty()) { f.setFinished(); } else { diff --git a/common/genericresource.h b/common/genericresource.h index 36fa567..ac28575 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -38,8 +38,8 @@ public: virtual ~GenericResource(); virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; - virtual Async::Job synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; - virtual Async::Job processAllMessages() Q_DECL_OVERRIDE; + virtual KAsync::Job synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; + virtual KAsync::Job processAllMessages() Q_DECL_OVERRIDE; virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE; int error() const; diff --git a/common/pipeline.cpp b/common/pipeline.cpp index e2f23ed..ea82720 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -94,7 +94,7 @@ void Pipeline::null() // state.step(); } -Async::Job Pipeline::newEntity(void const *command, size_t size) +KAsync::Job Pipeline::newEntity(void const *command, size_t size) { Log() << "Pipeline: New Entity"; @@ -107,7 +107,7 @@ Async::Job Pipeline::newEntity(void const *command, size_t size) flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { qWarning() << "invalid buffer, not a create entity buffer"; - return Async::error(); + return KAsync::error(); } } auto createEntity = Akonadi2::Commands::GetCreateEntity(command); @@ -118,7 +118,7 @@ Async::Job Pipeline::newEntity(void const *command, size_t size) flatbuffers::Verifier verifyer(reinterpret_cast(createEntity->delta()->Data()), createEntity->delta()->size()); if (!Akonadi2::VerifyEntityBuffer(verifyer)) { qWarning() << "invalid buffer, not an entity buffer"; - return Async::error(); + return KAsync::error(); } } auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); @@ -139,7 +139,7 @@ Async::Job Pipeline::newEntity(void const *command, size_t size) storage().setMaxRevision(newRevision); Log() << "Pipeline: wrote entity: "<< newRevision; - return Async::start([this, key, entityType](Async::Future &future) { + return KAsync::start([this, key, entityType](KAsync::Future &future) { PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { future.setFinished(); }); diff --git a/common/pipeline.h b/common/pipeline.h index a574d27..d25fc56 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -53,7 +53,7 @@ public: void null(); - Async::Job newEntity(void const *command, size_t size); + KAsync::Job newEntity(void const *command, size_t size); void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); void deletedEntity(const QString &entityType, const QByteArray &key); diff --git a/common/resource.cpp b/common/resource.cpp index e158a40..bd69afd 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -53,16 +53,16 @@ void Resource::processCommand(int commandId, const QByteArray &data, uint size, pipeline->null(); } -Async::Job Resource::synchronizeWithSource(Pipeline *pipeline) +KAsync::Job Resource::synchronizeWithSource(Pipeline *pipeline) { - return Async::start([pipeline](Async::Future &f) { + return KAsync::start([pipeline](KAsync::Future &f) { pipeline->null(); }); } -Async::Job Resource::processAllMessages() +KAsync::Job Resource::processAllMessages() { - return Async::null(); + return KAsync::null(); } class ResourceFactory::Private diff --git a/common/resource.h b/common/resource.h index 18a6827..170e080 100644 --- a/common/resource.h +++ b/common/resource.h @@ -36,8 +36,8 @@ public: virtual ~Resource(); virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); - virtual Async::Job synchronizeWithSource(Pipeline *pipeline); - virtual Async::Job processAllMessages(); + virtual KAsync::Job synchronizeWithSource(Pipeline *pipeline); + virtual KAsync::Job processAllMessages(); virtual void configurePipeline(Pipeline *pipeline); diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index b7d569b..feffcf4 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -70,8 +70,8 @@ class ResourceAccess::Private { public: Private(const QByteArray &name, ResourceAccess *ra); - Async::Job tryToConnect(); - Async::Job initializeSocket(); + KAsync::Job tryToConnect(); + KAsync::Job initializeSocket(); QByteArray resourceName; QSharedPointer socket; QByteArray partialMessageBuffer; @@ -89,10 +89,10 @@ ResourceAccess::Private::Private(const QByteArray &name, ResourceAccess *q) } //Connects to server and returns connected socket on success -Async::Job > ResourceAccess::connectToServer(const QByteArray &identifier) +KAsync::Job > ResourceAccess::connectToServer(const QByteArray &identifier) { auto s = QSharedPointer::create(); - return Async::start >([identifier, s](Async::Future > &future) { + return KAsync::start >([identifier, s](KAsync::Future > &future) { s->setServerName(identifier); auto context = new QObject; QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() { @@ -109,15 +109,15 @@ Async::Job > ResourceAccess::connectToServer(const }); } -Async::Job ResourceAccess::Private::tryToConnect() +KAsync::Job ResourceAccess::Private::tryToConnect() { - return Async::dowhile([this]() -> bool { + return KAsync::dowhile([this]() -> bool { //TODO abort after N retries? return !socket; }, - [this](Async::Future &future) { + [this](KAsync::Future &future) { Trace() << "Loop"; - Async::wait(50) + KAsync::wait(50) .then(connectToServer(resourceName)) .then >([this, &future](const QSharedPointer &s) { Q_ASSERT(s); @@ -130,9 +130,9 @@ Async::Job ResourceAccess::Private::tryToConnect() }); } -Async::Job ResourceAccess::Private::initializeSocket() +KAsync::Job ResourceAccess::Private::initializeSocket() { - return Async::start([this](Async::Future &future) { + return KAsync::start([this](KAsync::Future &future) { Trace() << "Trying to connect"; connectToServer(resourceName).then >([this, &future](const QSharedPointer &s) { Trace() << "Connected to resource, without having to start it."; @@ -189,9 +189,9 @@ void ResourceAccess::registerCallback(uint messageId, const std::functionresultHandler.insert(messageId, callback); } -Async::Job ResourceAccess::sendCommand(int commandId) +KAsync::Job ResourceAccess::sendCommand(int commandId) { - return Async::start([this, commandId](Async::Future &f) { + return KAsync::start([this, commandId](KAsync::Future &f) { auto continuation = [&f](int error, const QString &errorMessage) { if (error) { f.setError(error, errorMessage); @@ -205,11 +205,11 @@ Async::Job ResourceAccess::sendCommand(int commandId) }); } -Async::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) +KAsync::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) { //The flatbuffer is transient, but we want to store it until the job is executed QByteArray buffer(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); - return Async::start([commandId, buffer, this](Async::Future &f) { + return KAsync::start([commandId, buffer, this](KAsync::Future &f) { auto callback = [&f](int error, const QString &errorMessage) { if (error) { f.setError(error, errorMessage); @@ -225,7 +225,7 @@ Async::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu }); } -Async::Job ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) +KAsync::Job ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) { auto command = Akonadi2::CreateSynchronize(d->fbb, sourceSync, localSync); Akonadi2::FinishSynchronizeBuffer(d->fbb, command); diff --git a/common/resourceaccess.h b/common/resourceaccess.h index c16a9d2..648b12e 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -43,13 +43,13 @@ public: QByteArray resourceName() const; bool isReady() const; - Async::Job sendCommand(int commandId); - Async::Job sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); - Async::Job synchronizeResource(bool remoteSync, bool localSync); + KAsync::Job sendCommand(int commandId); + KAsync::Job sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); + KAsync::Job synchronizeResource(bool remoteSync, bool localSync); /** * Tries to connect to server, and returns a connected socket on success. */ - static Async::Job > connectToServer(const QByteArray &identifier); + static KAsync::Job > connectToServer(const QByteArray &identifier); public Q_SLOTS: void open(); -- cgit v1.2.3