From 26816c21f60450e461a5b6ef4ef740f6070ce278 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 27 Jul 2016 02:26:47 +0200 Subject: Ported to the kasync revamp --- common/changereplay.cpp | 58 ++++----- common/genericresource.cpp | 134 ++++++++++----------- common/listener.cpp | 18 +-- common/messagequeue.cpp | 37 +----- common/pipeline.cpp | 6 +- common/queryrunner.cpp | 4 +- common/resourceaccess.cpp | 117 +++++++++--------- common/resourcecontrol.cpp | 29 ++--- common/resourcefacade.cpp | 6 +- common/sourcewriteback.cpp | 14 +-- common/store.cpp | 68 +++++------ common/synchronizer.cpp | 2 +- common/test.cpp | 2 +- examples/dummyresource/resourcefactory.cpp | 2 +- examples/imapresource/imapresource.cpp | 74 ++++++------ examples/imapresource/imapserverproxy.cpp | 35 +++--- examples/maildirresource/maildirresource.cpp | 18 +-- .../mailtransportresource.cpp | 7 +- sinksh/syntax_modules/sink_sync.cpp | 4 +- tests/accountstest.cpp | 8 +- tests/clientapitest.cpp | 4 +- tests/dummyresourcebenchmark.cpp | 30 +---- tests/mailsynctest.cpp | 22 ++-- tests/mailtest.cpp | 22 ++-- tests/resourcecommunicationtest.cpp | 28 ++--- 25 files changed, 337 insertions(+), 412 deletions(-) diff --git a/common/changereplay.cpp b/common/changereplay.cpp index fbd556f..e3b7158 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp @@ -72,7 +72,7 @@ KAsync::Job ChangeReplay::replayNextRevision() { auto lastReplayedRevision = QSharedPointer::create(0); auto topRevision = QSharedPointer::create(0); - return KAsync::start([this, lastReplayedRevision, topRevision]() { + return KAsync::syncStart([this, lastReplayedRevision, topRevision]() { mReplayInProgress = true; mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { SinkWarning() << error.message; @@ -90,11 +90,9 @@ KAsync::Job ChangeReplay::replayNextRevision() SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; }) .then(KAsync::dowhile( - [this, lastReplayedRevision, topRevision](KAsync::Future &future) { + [this, lastReplayedRevision, topRevision]() -> KAsync::Job { if (*lastReplayedRevision >= *topRevision) { - future.setValue(false); - future.setFinished(); - return; + return KAsync::value(KAsync::Break); } qint64 revision = *lastReplayedRevision + 1; @@ -109,12 +107,15 @@ KAsync::Job ChangeReplay::replayNextRevision() [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { SinkTrace() << "Replaying " << key; if (canReplay(type, key, value)) { - replayJob = replay(type, key, value).then([this, revision, lastReplayedRevision]() { - recordReplayedRevision(revision); - *lastReplayedRevision = revision; - }, - [revision](int, QString) { - SinkTrace() << "Change replay failed" << revision; + replayJob = replay(type, key, value).then([this, revision, lastReplayedRevision](const KAsync::Error &error) { + if (error) { + SinkTrace() << "Change replay failed" << revision; + return KAsync::error(error); + } else { + recordReplayedRevision(revision); + *lastReplayedRevision = revision; + } + return KAsync::null(); }); exitLoop = true; } else { @@ -128,23 +129,26 @@ KAsync::Job ChangeReplay::replayNextRevision() } revision++; } - replayJob.then([this, revision, lastReplayedRevision, topRevision, &future]() { - SinkTrace() << "Replayed until " << revision; - recordReplayedRevision(*lastReplayedRevision); - QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() { - future.setValue((*lastReplayedRevision < *topRevision)); - future.setFinished(); - }); - }, - [this, revision, &future](int, QString) { - SinkTrace() << "Change replay failed" << revision; - //We're probably not online or so, so postpone retrying - future.setValue(false); - future.setFinished(); - }).exec(); - + return replayJob.then([this, revision, lastReplayedRevision, topRevision](const KAsync::Error &error) ->KAsync::Job { + if (error) { + SinkTrace() << "Change replay failed" << revision; + //We're probably not online or so, so postpone retrying + return KAsync::value(KAsync::Break); + } else { + SinkTrace() << "Replayed until " << revision; + recordReplayedRevision(*lastReplayedRevision); + if (*lastReplayedRevision < *topRevision) { + return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); + } else { + return KAsync::value(KAsync::Break); + } + } + //We shouldn't ever get here + Q_ASSERT(false); + return KAsync::value(KAsync::Break); + }); })) - .then([this, lastReplayedRevision]() { + .syncThen([this, lastReplayedRevision]() { recordReplayedRevision(*lastReplayedRevision); mMainStoreTransaction.abort(); if (allChangesReplayed()) { diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 7136882..f5b1775 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -100,7 +100,7 @@ private slots: } mProcessingLock = true; auto job = processPipeline() - .then([this]() { + .syncThen([this]() { mProcessingLock = false; if (messagesToProcessAvailable()) { process(); @@ -122,7 +122,8 @@ private slots: return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); case Sink::Commands::InspectionCommand: if (mInspect) { - return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([]() { return -1; }); + return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()) + .syncThen([]() { return -1; }); } else { return KAsync::error(-1, "Missing inspection command."); } @@ -131,7 +132,7 @@ private slots: } } - KAsync::Job processQueuedCommand(const QByteArray &data) + KAsync::Job processQueuedCommand(const QByteArray &data) { flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { @@ -143,13 +144,13 @@ private slots: SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); return processQueuedCommand(queuedCommand) .then( - [this, commandId](qint64 createdRevision) -> qint64 { + [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job { + if (error) { + SinkWarning() << "Error while processing queue command: " << error.errorMessage; + return KAsync::error(error); + } SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); - return createdRevision; - }, - [](int errorCode, QString errorMessage) { - // FIXME propagate error, we didn't handle it - SinkWarning() << "Error while processing queue command: " << errorMessage; + return KAsync::value(createdRevision); }); } @@ -157,31 +158,31 @@ private slots: KAsync::Job processQueue(MessageQueue *queue) { auto time = QSharedPointer::create(); - return KAsync::start([this]() { mPipeline->startTransaction(); }) - .then(KAsync::dowhile([queue]() { return !queue->isEmpty(); }, - [this, queue, time](KAsync::Future &future) { - queue->dequeueBatch(sBatchSize, - [this, time](const QByteArray &data) { - time->start(); - return KAsync::start([this, data, time](KAsync::Future &future) { - processQueuedCommand(data) - .then([&future, this, time](qint64 createdRevision) { - SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); - future.setFinished(); - }) - .exec(); - }); - }) - .then([&future, queue]() { future.setFinished(); }, - [&future](int i, QString error) { - if (i != MessageQueue::ErrorCodes::NoMessageFound) { - SinkWarning() << "Error while getting message from messagequeue: " << error; + return KAsync::syncStart([this]() { mPipeline->startTransaction(); }) + .then(KAsync::dowhile( + [this, queue, time]() -> KAsync::Job { + return queue->dequeueBatch(sBatchSize, + [this, time](const QByteArray &data) -> KAsync::Job { + time->start(); + return processQueuedCommand(data) + .syncThen([this, time](qint64 createdRevision) { + SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); + }); + }) + .then([queue](const KAsync::Error &error) { + if (error) { + if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { + SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; + } } - future.setFinished(); - }) - .exec(); + if (queue->isEmpty()) { + return KAsync::value(KAsync::Break); + } else { + return KAsync::value(KAsync::Continue); + } + }); })) - .then([this]() { mPipeline->commit(); }); + .syncThen([this](const KAsync::Error &) { mPipeline->commit(); }); } KAsync::Job processPipeline() @@ -198,18 +199,20 @@ private slots: // Go through all message queues auto it = QSharedPointer>::create(mCommandQueues); - return KAsync::dowhile([it]() { return it->hasNext(); }, - [it, this](KAsync::Future &future) { + return KAsync::dowhile( + [it, this]() { auto time = QSharedPointer::create(); time->start(); auto queue = it->next(); - processQueue(queue) - .then([this, &future, time]() { + return processQueue(queue) + .syncThen([this, time, it]() { SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); - future.setFinished(); - }) - .exec(); + if (it->hasNext()) { + return KAsync::Continue; + } + return KAsync::Break; + }); }); } @@ -251,22 +254,19 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra s >> expectedValue; inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) .then( - [=]() { - Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; - Sink::Notification n; - n.type = Sink::Notification::Inspection; - n.id = inspectionId; - n.code = Sink::Notification::Success; - emit notify(n); - }, - [=](int code, const QString &message) { - Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message; + [=](const KAsync::Error &error) { Sink::Notification n; n.type = Sink::Notification::Inspection; - n.message = message; n.id = inspectionId; - n.code = Sink::Notification::Failure; + if (error) { + Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; + n.code = Sink::Notification::Failure; + } else { + Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; + n.code = Sink::Notification::Success; + } emit notify(n); + return KAsync::null(); }) .exec(); return KAsync::null(); @@ -420,7 +420,7 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) KAsync::Job GenericResource::synchronizeWithSource() { - return KAsync::start([this](KAsync::Future &future) { + return KAsync::start([this]() { Sink::Notification n; n.id = "sync"; @@ -432,23 +432,21 @@ KAsync::Job GenericResource::synchronizeWithSource() SinkLog() << " Synchronizing"; // Changereplay would deadlock otherwise when trying to open the synchronization store enableChangeReplay(false); - mSynchronizer->synchronize() - .then([this, &future]() { - SinkLog() << "Done Synchronizing"; - Sink::Notification n; - n.id = "sync"; - n.type = Sink::Notification::Status; - n.message = "Synchronization has ended."; - n.code = Sink::ApplicationDomain::ConnectedStatus; - emit notify(n); - - enableChangeReplay(true); - future.setFinished(); - }, [this, &future](int errorCode, const QString &error) { + return mSynchronizer->synchronize() + .then([this](const KAsync::Error &error) { enableChangeReplay(true); - future.setError(errorCode, error); - }) - .exec(); + if (!error) { + SinkLog() << "Done Synchronizing"; + Sink::Notification n; + n.id = "sync"; + n.type = Sink::Notification::Status; + n.message = "Synchronization has ended."; + n.code = Sink::ApplicationDomain::ConnectedStatus; + emit notify(n); + return KAsync::null(); + } + return KAsync::error(error); + }); }); } diff --git a/common/listener.cpp b/common/listener.cpp index a051293..027d9ae 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -248,13 +248,17 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c if (buffer->localSync()) { job = job.then(loadResource().processAllMessages()); } - job.then([callback, timer]() { - SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); - callback(true); - }, [callback](int errorCode, const QString &msg) { - SinkWarning() << "Sync failed: " << msg; - callback(false); - }) + job.then([callback, timer](const KAsync::Error &error) { + if (error) { + SinkWarning() << "Sync failed: " << error.errorMessage; + callback(false); + return KAsync::error(error); + } else { + SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); + callback(true); + return KAsync::null(); + } + }) .exec(); return; } else { diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 3567a10..28eacb7 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp @@ -5,37 +5,6 @@ SINK_DEBUG_AREA("messagequeue") -static KAsync::Job waitForCompletion(QList> &futures) -{ - auto context = new QObject; - return KAsync::start([futures, context](KAsync::Future &future) { - const auto total = futures.size(); - auto count = QSharedPointer::create(); - int i = 0; - for (KAsync::Future subFuture : futures) { - i++; - if (subFuture.isFinished()) { - *count += 1; - continue; - } - // FIXME bind lifetime all watcher to future (repectively the main job - auto watcher = QSharedPointer>::create(); - QObject::connect(watcher.data(), &KAsync::FutureWatcher::futureReady, [count, total, &future]() { - *count += 1; - if (*count == total) { - future.setFinished(); - } - }); - watcher->setFuture(subFuture); - context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); - } - if (*count == total) { - future.setFinished(); - } - }) - .then([context]() { delete context; }); -} - MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) { } @@ -101,7 +70,7 @@ void MessageQueue::dequeue(const std::function([&value, resultHandler](KAsync::Future &future) { resultHandler(const_cast(static_cast(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); }); - }).then([]() {}, [errorHandler](int error, const QString &errorString) { errorHandler(Error("messagequeue", error, errorString.toLatin1())); }).exec(); + }).onError([errorHandler](const KAsync::Error &error) { errorHandler(Error("messagequeue", error.errorCode, error.errorMessage.toLatin1())); }).exec(); } KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler) @@ -135,8 +104,8 @@ KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::functi }); // Trace() << "Waiting on " << waitCondition.size() << " results"; - ::waitForCompletion(waitCondition) - .then([this, resultCount, &future]() { + KAsync::waitForCompletion(waitCondition) + .syncThen([this, resultCount, &future]() { processRemovals(); if (*resultCount == 0) { future.setError(static_cast(ErrorCodes::NoMessageFound), "No message found"); diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 1d45340..ce864f7 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -233,7 +233,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) d->storeNewRevision(newRevision, fbb, bufferType, key); - return KAsync::start([newRevision]() { return newRevision; }); + return KAsync::value(newRevision); } KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) @@ -346,7 +346,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); d->storeNewRevision(newRevision, fbb, bufferType, key); - return KAsync::start([newRevision]() { return newRevision; }); + return KAsync::value(newRevision); } KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) @@ -433,7 +433,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) processor->deletedEntity(key, newRevision, *current, d->transaction); } - return KAsync::start([newRevision]() { return newRevision; }); + return KAsync::value(newRevision); } void Pipeline::cleanupRevision(qint64 revision) diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 2e2e96d..052db39 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -86,7 +86,7 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); return newRevisionAndReplayedEntities; }) - .template then>([=](const QPair &newRevisionAndReplayedEntities) { + .template syncThen>([=](const QPair &newRevisionAndReplayedEntities) { mOffset[parentId] += newRevisionAndReplayedEntities.second; // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. if (query.liveQuery) { @@ -110,7 +110,7 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); return newRevisionAndReplayedEntities; }) - .template then >([query, this, resultProvider](const QPair &newRevisionAndReplayedEntities) { + .template syncThen >([query, this, resultProvider](const QPair &newRevisionAndReplayedEntities) { // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); resultProvider->setRevision(newRevisionAndReplayedEntities.first); diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 7b4d839..364616c 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -159,67 +159,65 @@ KAsync::Job ResourceAccess::Private::tryToConnect() // We may have a socket from the last connection leftover socket.reset(); auto counter = QSharedPointer::create(0); - return KAsync::dowhile([this]() -> bool { return !socket; }, - [this, counter](KAsync::Future &future) { + return KAsync::dowhile( + [this, counter]() { SinkTrace() << "Loop"; - connectToServer(resourceInstanceIdentifier) - .then>( - [this, &future](const QSharedPointer &s) { - Q_ASSERT(s); - socket = s; - future.setFinished(); - }, - [&future, counter, this](int errorCode, const QString &errorString) { - static int waitTime = 10; - static int timeout = 500; - static int maxRetries = timeout / waitTime; - if (*counter > maxRetries) { - SinkTrace() << "Giving up"; - future.setError(-1, "Failed to connect to socket"); + return connectToServer(resourceInstanceIdentifier) + .then>( + [this, counter](const KAsync::Error &error, const QSharedPointer &s) { + if (error) { + static int waitTime = 10; + static int timeout = 500; + static int maxRetries = timeout / waitTime; + if (*counter > maxRetries) { + SinkTrace() << "Giving up"; + return KAsync::error("Failed to connect to socket"); + } else { + *counter = *counter + 1; + return KAsync::wait(waitTime).then(KAsync::value(KAsync::Continue)); + } } else { - KAsync::wait(waitTime).then([&future]() { future.setFinished(); }).exec(); + Q_ASSERT(s); + socket = s; + return KAsync::value(KAsync::Break); } - *counter = *counter + 1; - }) - .exec(); + }); }); } KAsync::Job ResourceAccess::Private::initializeSocket() { - return KAsync::start([this](KAsync::Future &future) { + return KAsync::start([this] { SinkTrace() << "Trying to connect"; - connectToServer(resourceInstanceIdentifier) + return connectToServer(resourceInstanceIdentifier) .then>( - [this, &future](const QSharedPointer &s) { - SinkTrace() << "Connected to resource, without having to start it."; - Q_ASSERT(s); - socket = s; - future.setFinished(); - }, - [this, &future](int errorCode, const QString &errorString) { - SinkTrace() << "Failed to connect, starting resource"; - // We failed to connect, so let's start the resource - QStringList args; - if (Sink::Test::testModeEnabled()) { - args << "--test"; - } - args << resourceInstanceIdentifier << resourceName; - qint64 pid = 0; - if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { - SinkTrace() << "Started resource " << pid; - tryToConnect() - .then([&future]() { future.setFinished(); }, - [this, &future](int errorCode, const QString &errorString) { - SinkWarning() << "Failed to connect to started resource"; - future.setError(errorCode, errorString); - }) - .exec(); + [this](const KAsync::Error &error, const QSharedPointer &s) { + if (error) { + SinkTrace() << "Failed to connect, starting resource"; + // We failed to connect, so let's start the resource + QStringList args; + if (Sink::Test::testModeEnabled()) { + args << "--test"; + } + args << resourceInstanceIdentifier << resourceName; + qint64 pid = 0; + if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { + SinkTrace() << "Started resource " << pid; + return tryToConnect() + .onError([this](const KAsync::Error &error) { + SinkWarning() << "Failed to connect to started resource"; + }); + } else { + SinkWarning() << "Failed to start resource"; + } + return KAsync::null(); } else { - SinkWarning() << "Failed to start resource"; + SinkTrace() << "Connected to resource, without having to start it."; + Q_ASSERT(s); + socket = s; + return KAsync::null(); } - }) - .exec(); + }); }); } @@ -383,17 +381,18 @@ void ResourceAccess::open() d->openingSocket = true; d->initializeSocket() .then( - [this, time]() { - SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); + [this, time](const KAsync::Error &error) { d->openingSocket = false; - QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); - QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); - QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); - connected(); - }, - [this](int error, const QString &errorString) { - d->openingSocket = false; - SinkWarning() << "Failed to initialize socket " << errorString; + if (error) { + SinkWarning() << "Failed to initialize socket " << error.errorMessage; + } else { + SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); + QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); + QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); + QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); + connected(); + } + return KAsync::null(); }) .exec(); } diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp index 7d092a4..f509318 100644 --- a/common/resourcecontrol.cpp +++ b/common/resourcecontrol.cpp @@ -41,22 +41,22 @@ KAsync::Job ResourceControl::shutdown(const QByteArray &identifier) time->start(); return ResourceAccess::connectToServer(identifier) .then>( - [identifier, time](QSharedPointer socket, KAsync::Future &future) { + [identifier, time](const KAsync::Error &error, QSharedPointer socket) { + if (error) { + SinkTrace() << "Resource is already closed."; + // Resource isn't started, nothing to shutdown + return KAsync::null(); + } // We can't currently reuse the socket socket->close(); auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); resourceAccess->open(); - resourceAccess->sendCommand(Sink::Commands::ShutdownCommand) - .then([&future, resourceAccess, time]() { + return resourceAccess->sendCommand(Sink::Commands::ShutdownCommand) + .addToContext(resourceAccess) + .syncThen([resourceAccess, time]() { resourceAccess->close(); SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed()); - future.setFinished(); - }) - .exec(); - }, - [](int, const QString &) { - SinkTrace() << "Resource is already closed."; - // Resource isn't started, nothing to shutdown + }); }); } @@ -67,18 +67,19 @@ KAsync::Job ResourceControl::start(const QByteArray &identifier) time->start(); auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); resourceAccess->open(); - return resourceAccess->sendCommand(Sink::Commands::PingCommand).then([resourceAccess, time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); }); + return resourceAccess->sendCommand(Sink::Commands::PingCommand).addToContext(resourceAccess).syncThen([time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); }); } KAsync::Job ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier) { SinkTrace() << "flushMessageQueue" << resourceIdentifier; - return KAsync::iterate(resourceIdentifier) - .template each([](const QByteArray &resource, KAsync::Future &future) { + return KAsync::value(resourceIdentifier) + .template each([](const QByteArray &resource) { SinkTrace() << "Flushing message queue " << resource; auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); resourceAccess->open(); - resourceAccess->synchronizeResource(false, true).then([&future, resourceAccess]() { future.setFinished(); }).exec(); + return resourceAccess->synchronizeResource(false, true) + .addToContext(resourceAccess); }); } diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index bf4239d..1c56fe5 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp @@ -167,7 +167,7 @@ template KAsync::Job LocalStorageFacade::create(const DomainType &domainObject) { auto configStoreIdentifier = mIdentifier; - return KAsync::start([domainObject, configStoreIdentifier]() { + return KAsync::syncStart([domainObject, configStoreIdentifier]() { const QByteArray type = domainObject.getProperty("type").toByteArray(); const QByteArray providedIdentifier = domainObject.identifier().isEmpty() ? domainObject.getProperty("identifier").toByteArray() : domainObject.identifier(); const QByteArray identifier = providedIdentifier.isEmpty() ? ResourceConfig::newIdentifier(type) : providedIdentifier; @@ -192,7 +192,7 @@ template KAsync::Job LocalStorageFacade::modify(const DomainType &domainObject) { auto configStoreIdentifier = mIdentifier; - return KAsync::start([domainObject, configStoreIdentifier]() { + return KAsync::syncStart([domainObject, configStoreIdentifier]() { const QByteArray identifier = domainObject.identifier(); if (identifier.isEmpty()) { SinkWarning() << "We need an \"identifier\" property to identify the entity to configure."; @@ -220,7 +220,7 @@ template KAsync::Job LocalStorageFacade::remove(const DomainType &domainObject) { auto configStoreIdentifier = mIdentifier; - return KAsync::start([domainObject, configStoreIdentifier]() { + return KAsync::syncStart([domainObject, configStoreIdentifier]() { const QByteArray identifier = domainObject.identifier(); if (identifier.isEmpty()) { SinkWarning() << "We need an \"identifier\" property to identify the entity to configure"; diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp index fe996cb..702d8e3 100644 --- a/common/sourcewriteback.cpp +++ b/common/sourcewriteback.cpp @@ -106,7 +106,7 @@ KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArr job = replay(mail, operation, oldRemoteId, modifiedProperties); } - return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { + return job.syncThen([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { if (operation == Sink::Operation_Creation) { SinkTrace() << "Replayed creation with remote id: " << remoteId; if (remoteId.isEmpty()) { @@ -127,13 +127,11 @@ KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArr } else { SinkError() << "Unkown operation" << operation; } - - mSyncStore.clear(); - mEntityStore.clear(); - mTransaction.abort(); - mSyncTransaction.commit(); - }, [this](int errorCode, const QString &errorMessage) { - SinkWarning() << "Failed to replay change: " << errorMessage; + }) + .syncThen([this](const KAsync::Error &error) { + if (error) { + SinkWarning() << "Failed to replay change: " << error.errorMessage; + } mSyncStore.clear(); mEntityStore.clear(); mTransaction.abort(); diff --git a/common/store.cpp b/common/store.cpp index 07f41f8..c01d220 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -39,6 +39,8 @@ SINK_DEBUG_AREA("store") Q_DECLARE_METATYPE(QSharedPointer>) +Q_DECLARE_METATYPE(QSharedPointer); +Q_DECLARE_METATYPE(std::shared_ptr); namespace Sink { @@ -169,12 +171,10 @@ QSharedPointer Store::loadModel(Query query) result.first.exec(); } - KAsync::iterate(resources.keys()) - .template each([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier, KAsync::Future &future) { + KAsync::value(resources.keys()) + .template each([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier) { const auto resourceType = resources.value(resourceInstanceIdentifier); - queryResource(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter).template then([&future]() { - future.setFinished(); - }).exec(); + return queryResource(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter); }) .exec(); model->fetchMore(QModelIndex()); @@ -201,7 +201,7 @@ KAsync::Job Store::create(const DomainType &domainObject) { // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); - return facade->create(domainObject).template then([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to create"; }); + return facade->create(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create"; }); } template @@ -209,7 +209,7 @@ KAsync::Job Store::modify(const DomainType &domainObject) { // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); - return facade->modify(domainObject).template then([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to modify"; }); + return facade->modify(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; }); } template @@ -217,7 +217,7 @@ KAsync::Job Store::remove(const DomainType &domainObject) { // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); - return facade->remove(domainObject).template then([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to remove"; }); + return facade->remove(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove"; }); } KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) @@ -231,6 +231,7 @@ KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); resourceAccess->open(); return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) + .addToContext(resourceAccess) .then([resourceAccess](KAsync::Future &future) { if (resourceAccess->isReady()) { //Wait for the resource shutdown @@ -243,8 +244,8 @@ KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) future.setFinished(); } }) - .then([resourceAccess, time]() { - SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); + .syncThen([time]() { + SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); }); } @@ -253,41 +254,38 @@ KAsync::Job Store::synchronize(const Sink::Query &query) SinkTrace() << "synchronize" << query.resources; auto resources = getResources(query.resources, query.accounts).keys(); //FIXME only necessary because each doesn't propagate errors - auto error = new bool; - return KAsync::iterate(resources) - .template each([query, error](const QByteArray &resource, KAsync::Future &future) { + auto errorFlag = new bool; + return KAsync::value(resources) + .template each([query, errorFlag](const QByteArray &resource) { SinkTrace() << "Synchronizing " << resource; auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); resourceAccess->open(); - resourceAccess->synchronizeResource(true, false).then([resourceAccess, &future]() {SinkTrace() << "synced."; future.setFinished(); }, - [&future, error](int errorCode, QString msg) { *error = true; SinkWarning() << "Error during sync."; future.setError(errorCode, msg); }).exec(); - }).then([error](KAsync::Future &future) { - if (*error) { - future.setError(1, "Error during sync."); - } else { - future.setFinished(); + return resourceAccess->synchronizeResource(true, false) + .addToContext(resourceAccess) + .then([errorFlag](const KAsync::Error &error) { + if (error) { + *errorFlag = true; + SinkWarning() << "Error during sync."; + return KAsync::error(error); + } + SinkTrace() << "synced."; + return KAsync::null(); + }); + }) + .then([errorFlag]() { + if (*errorFlag) { + return KAsync::error("Error during sync."); } - delete error; + delete errorFlag; + return KAsync::null(); }); } template KAsync::Job Store::fetchOne(const Sink::Query &query) { - return KAsync::start([query](KAsync::Future &future) { - // FIXME We could do this more elegantly if composed jobs would have the correct type (In that case we'd simply return the value from then continuation, and could avoid the - // outer job entirely) - fetch(query, 1) - .template then>( - [&future](const QList &list) { - future.setValue(*list.first()); - future.setFinished(); - }, - [&future](int errorCode, const QString &errorMessage) { - future.setError(errorCode, errorMessage); - future.setFinished(); - }) - .exec(); + return fetch(query, 1).template then>([](const QList &list) { + return KAsync::value(*list.first()); }); } diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 2d4fb8d..15a06e7 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -244,7 +244,7 @@ KAsync::Job Synchronizer::synchronize() SinkTrace() << "Synchronizing"; mSyncInProgress = true; mMessageQueue->startTransaction(); - return synchronizeWithSource().then([this]() { + return synchronizeWithSource().syncThen([this]() { mSyncStore.clear(); mEntityStore.clear(); mMessageQueue->commit(); diff --git a/common/test.cpp b/common/test.cpp index 5b4c899..1a8e11d 100644 --- a/common/test.cpp +++ b/common/test.cpp @@ -156,7 +156,7 @@ public: } resultProvider->initialResultSetComplete(parent); }); - auto job = KAsync::start([query, resultProvider]() {}); + auto job = KAsync::syncStart([query, resultProvider]() {}); return qMakePair(job, emitter); } diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index 0f7463f..221e20d 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp @@ -113,7 +113,7 @@ class DummySynchronizer : public Sink::Synchronizer { KAsync::Job synchronizeWithSource() Q_DECL_OVERRIDE { SinkLog() << " Synchronizing with the source"; - return KAsync::start([this]() { + return KAsync::syncStart([this]() { synchronize(ENTITY_TYPE_EVENT, DummyStore::instance().events(), [this](const QByteArray &ridBuffer, const QMap &data) { return createEvent(ridBuffer, data); }); diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp index e199ea1..f78376a 100644 --- a/examples/imapresource/imapresource.cpp +++ b/examples/imapresource/imapresource.cpp @@ -56,6 +56,8 @@ SINK_DEBUG_AREA("imapresource") +Q_DECLARE_METATYPE(QSharedPointer) + using namespace Imap; using namespace Sink; @@ -217,22 +219,22 @@ public: SinkLog() << "Synchronizing mails" << folder.normalizedPath(); auto capabilities = imap->getCapabilities(); bool canDoIncrementalRemovals = false; - return KAsync::start([=]() { + return KAsync::syncStart([=]() { //TODO update flags }) - .then>([=]() { + .then([=]() { //TODO Remove what's no longer existing if (canDoIncrementalRemovals) { } else { - return imap->fetchUids(folder).then>([this, folder](const QVector &uids) { + return imap->fetchUids(folder).syncThen>([this, folder](const QVector &uids) { SinkTrace() << "Syncing removals"; synchronizeRemovals(folder.normalizedPath(), uids.toList().toSet()); commit(); - }).then([](){}); + }); } return KAsync::null(); }) - .then>([this, folder, imap]() { + .then([this, folder, imap]() { SinkTrace() << "About to fetch mail"; const auto uidNext = syncStore().readValue(folder.normalizedPath().toUtf8() + "uidnext").toLongLong(); auto maxUid = QSharedPointer::create(0); @@ -248,7 +250,7 @@ public: [](int progress, int total) { SinkTrace() << "Progress: " << progress << " out of " << total; }) - .then([this, maxUid, folder]() { + .syncThen([this, maxUid, folder]() { syncStore().writeValue(folder.normalizedPath().toUtf8() + "uidnext", QByteArray::number(*maxUid)); }); }); @@ -330,15 +332,12 @@ public: flags << Imap::Flags::Flagged; } QDateTime internalDate = mail.getDate(); - auto rid = QSharedPointer::create(); return login.then(imap->append(mailbox, content, flags, internalDate)) - .then([imap, mailbox, rid, mail](qint64 uid) { + .addToContext(imap) + .syncThen([mail](qint64 uid) { const auto remoteId = assembleMailRid(mail, uid); - //FIXME this get's called after the final error handler? WTF? SinkTrace() << "Finished creating a new mail: " << remoteId; - *rid = remoteId; - }).then([rid, imap]() { //FIXME fix KJob so we don't need this extra clause - return *rid; + return remoteId; }); } else if (operation == Sink::Operation_Removal) { const auto folderId = folderIdFromMailRid(oldRemoteId); @@ -348,7 +347,7 @@ public: KIMAP::ImapSet set; set.add(uid); return login.then(imap->remove(mailbox, set)) - .then([imap, oldRemoteId]() { + .syncThen([imap, oldRemoteId] { SinkTrace() << "Finished removing a mail: " << oldRemoteId; return QByteArray(); }); @@ -374,29 +373,24 @@ public: const QString oldMailbox = syncStore().resolveLocalId(ENTITY_TYPE_FOLDER, folderId); QByteArray content = KMime::LFtoCRLF(mail.getMimeMessage()); QDateTime internalDate = mail.getDate(); - auto rid = QSharedPointer::create(); KIMAP::ImapSet set; set.add(uid); return login.then(imap->append(mailbox, content, flags, internalDate)) - .then([imap, mailbox, rid, mail](qint64 uid) { + .addToContext(imap) + .then([=](qint64 uid) { const auto remoteId = assembleMailRid(mail, uid); SinkTrace() << "Finished creating a modified mail: " << remoteId; - *rid = remoteId; - }) - .then(imap->remove(oldMailbox, set)) - .then([rid, imap]() { - return *rid; + return imap->remove(oldMailbox, set).then(KAsync::value(remoteId)); }); } else { SinkTrace() << "Updating flags only."; KIMAP::ImapSet set; set.add(uid); return login.then(imap->select(mailbox)) + .addToContext(imap) .then(imap->storeFlags(set, flags)) - .then([imap, mailbox]() { + .syncThen([=] { SinkTrace() << "Finished modifying mail"; - }) - .then([oldRemoteId, imap]() { return oldRemoteId; }); } @@ -416,13 +410,13 @@ public: SinkTrace() << "Creating a new folder: " << parentFolder << folder.getName(); auto rid = QSharedPointer::create(); auto createFolder = login.then(imap->createSubfolder(parentFolder, folder.getName())) - .then([imap, rid](const QString &createdFolder) { + .syncThen([imap, rid](const QString &createdFolder) { SinkTrace() << "Finished creating a new folder: " << createdFolder; *rid = createdFolder.toUtf8(); }); if (folder.getSpecialPurpose().isEmpty()) { return createFolder - .then([rid](){ + .syncThen([rid](){ return *rid; }); } else { //We try to merge special purpose folders first @@ -435,7 +429,7 @@ public: }; } })) - .then>([specialPurposeFolders, folder, imap, parentFolder, rid]() -> KAsync::Job { + .then([specialPurposeFolders, folder, imap, parentFolder, rid]() -> KAsync::Job { for (const auto &purpose : folder.getSpecialPurpose()) { if (specialPurposeFolders->contains(purpose)) { auto f = specialPurposeFolders->value(purpose); @@ -446,13 +440,13 @@ public: } SinkTrace() << "No match found for merging, creating a new folder"; return imap->createSubfolder(parentFolder, folder.getName()) - .then([imap, rid](const QString &createdFolder) { + .syncThen([imap, rid](const QString &createdFolder) { SinkTrace() << "Finished creating a new folder: " << createdFolder; *rid = createdFolder.toUtf8(); }); }) - .then([rid](){ + .syncThen([rid](){ return *rid; }); return mergeJob; @@ -460,7 +454,7 @@ public: } else if (operation == Sink::Operation_Removal) { SinkTrace() << "Removing a folder: " << oldRemoteId; return login.then(imap->remove(oldRemoteId)) - .then([oldRemoteId, imap]() { + .syncThen([oldRemoteId, imap]() { SinkTrace() << "Finished removing a folder: " << oldRemoteId; return QByteArray(); }); @@ -468,11 +462,11 @@ public: SinkTrace() << "Renaming a folder: " << oldRemoteId << folder.getName(); auto rid = QSharedPointer::create(); return login.then(imap->renameSubfolder(oldRemoteId, folder.getName())) - .then([imap, rid](const QString &createdFolder) { + .syncThen([imap, rid](const QString &createdFolder) { SinkTrace() << "Finished renaming a folder: " << createdFolder; *rid = createdFolder.toUtf8(); }) - .then([rid](){ + .syncThen([rid](){ return *rid; }); } @@ -566,7 +560,8 @@ KAsync::Job ImapResource::inspect(int inspectionType, const QByteArray &in SinkTrace() << "Connecting to:" << mServer << mPort; SinkTrace() << "as:" << mUser; auto inspectionJob = imap->login(mUser, mPassword) - .then(imap->select(folderRemoteId).then([](){})) + .then(imap->select(folderRemoteId)) + .syncThen([](Imap::SelectResult){}) .then(imap->fetch(set, scope, [imap, messageByUid](const QVector &messages) { for (const auto &m : messages) { messageByUid->insert(m.uid, m); @@ -575,7 +570,7 @@ KAsync::Job ImapResource::inspect(int inspectionType, const QByteArray &in if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) { if (property == "unread") { - return inspectionJob.then>([=]() { + return inspectionJob.then([=]() { auto msg = messageByUid->value(uid); if (expectedValue.toBool() && msg.flags.contains(Imap::Flags::Seen)) { return KAsync::error(1, "Expected unread but couldn't find it."); @@ -587,7 +582,7 @@ KAsync::Job ImapResource::inspect(int inspectionType, const QByteArray &in }); } if (property == "subject") { - return inspectionJob.then>([=]() { + return inspectionJob.then([=]() { auto msg = messageByUid->value(uid); if (msg.msg->subject(true)->asUnicodeString() != expectedValue.toString()) { return KAsync::error(1, "Subject not as expected: " + msg.msg->subject(true)->asUnicodeString()); @@ -597,7 +592,7 @@ KAsync::Job ImapResource::inspect(int inspectionType, const QByteArray &in } } if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { - return inspectionJob.then>([=]() { + return inspectionJob.then([=]() { if (!messageByUid->contains(uid)) { SinkWarning() << "Existing messages are: " << messageByUid->keys(); SinkWarning() << "We're looking for: " << uid; @@ -628,20 +623,19 @@ KAsync::Job ImapResource::inspect(int inspectionType, const QByteArray &in scope.mode = KIMAP::FetchJob::FetchScope::Headers; auto imap = QSharedPointer::create(mServer, mPort); auto messageByUid = QSharedPointer>::create(); - auto inspectionJob = imap->login(mUser, mPassword) - .then(imap->select(remoteId).then([](){})) + return imap->login(mUser, mPassword) + .then(imap->select(remoteId).syncThen([](){})) .then(imap->fetch(set, scope, [=](const QVector &messages) { for (const auto &m : messages) { messageByUid->insert(m.uid, m); } })) - .then>([imap, messageByUid, expectedCount]() { + .then([imap, messageByUid, expectedCount]() { if (messageByUid->size() != expectedCount) { return KAsync::error(1, QString("Wrong number of messages on the server; found %1 instead of %2.").arg(messageByUid->size()).arg(expectedCount)); } return KAsync::null(); }); - return inspectionJob; } if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { auto folderByPath = QSharedPointer>::create(); @@ -655,7 +649,7 @@ KAsync::Job ImapResource::inspect(int inspectionType, const QByteArray &in *folderByName << f.pathParts.last(); } })) - .then>([this, folderByName, folderByPath, folder, remoteId, imap]() { + .then([this, folderByName, folderByPath, folder, remoteId, imap]() { if (!folderByName->contains(folder.getName())) { SinkWarning() << "Existing folders are: " << *folderByPath; SinkWarning() << "We're looking for: " << folder.getName(); diff --git a/examples/imapresource/imapserverproxy.cpp b/examples/imapresource/imapserverproxy.cpp index a3d8d16..94367d8 100644 --- a/examples/imapresource/imapserverproxy.cpp +++ b/examples/imapresource/imapserverproxy.cpp @@ -161,7 +161,7 @@ KAsync::Job ImapServerProxy::login(const QString &username, const QString auto namespaceJob = new KIMAP::NamespaceJob(mSession); //FIXME The ping is only required because the login job doesn't fail after the configured timeout - return ping().then(runJob(loginJob)).then(runJob(capabilitiesJob)).then([this](){ + return ping().then(runJob(loginJob)).then(runJob(capabilitiesJob)).syncThen([this](){ SinkTrace() << "Supported capabilities: " << mCapabilities; QStringList requiredExtensions = QStringList() << "UIDPLUS" << "NAMESPACE"; for (const auto &requiredExtension : requiredExtensions) { @@ -170,7 +170,7 @@ KAsync::Job ImapServerProxy::login(const QString &username, const QString //TODO fail the job } } - }).then(runJob(namespaceJob)).then([this, namespaceJob](){ + }).then(runJob(namespaceJob)).syncThen([this, namespaceJob] { for (const auto &ns :namespaceJob->personalNamespaces()) { mPersonalNamespaces << ns.name; mPersonalNamespaceSeparator = ns.separator; @@ -363,7 +363,7 @@ KAsync::Job> ImapServerProxy::fetchHeaders(const QString &mailbox, list->append(uids.value(id)); } }) - .then>([list](){ + .syncThen>([list](){ return *list; }); } @@ -402,35 +402,34 @@ KAsync::Job ImapServerProxy::move(const QString &mailbox, const KIMAP::Ima KAsync::Job ImapServerProxy::createSubfolder(const QString &parentMailbox, const QString &folderName) { - auto folder = QSharedPointer::create(); - return KAsync::start>([this, parentMailbox, folderName, folder]() { + return KAsync::start([this, parentMailbox, folderName]() { Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); + auto folder = QSharedPointer::create(); if (parentMailbox.isEmpty()) { *folder = mPersonalNamespaces.toList().first() + folderName; } else { *folder = parentMailbox + mPersonalNamespaceSeparator + folderName; } SinkTrace() << "Creating subfolder: " << *folder; - return create(*folder); - }) - .then([=]() { - return *folder; + return create(*folder) + .syncThen([=]() { + return *folder; + }); }); } KAsync::Job ImapServerProxy::renameSubfolder(const QString &oldMailbox, const QString &newName) { - auto folder = QSharedPointer::create(); - return KAsync::start>([this, oldMailbox, newName, folder]() { + return KAsync::start([this, oldMailbox, newName] { Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); auto parts = oldMailbox.split(mPersonalNamespaceSeparator); parts.removeLast(); - *folder = parts.join(mPersonalNamespaceSeparator) + mPersonalNamespaceSeparator + newName; + auto folder = QSharedPointer::create(parts.join(mPersonalNamespaceSeparator) + mPersonalNamespaceSeparator + newName); SinkTrace() << "Renaming subfolder: " << oldMailbox << *folder; - return rename(oldMailbox, *folder); - }) - .then([=]() { - return *folder; + return rename(oldMailbox, *folder) + .syncThen([=]() { + return *folder; + }); }); } @@ -461,7 +460,7 @@ KAsync::Job ImapServerProxy::fetchMessages(const Folder &folder, qint64 ui auto time = QSharedPointer::create(); time->start(); Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); - return select(mailboxFromFolder(folder)).then, SelectResult>([this, callback, folder, time, progress, uidNext](const SelectResult &selectResult) -> KAsync::Job { + return select(mailboxFromFolder(folder)).then([this, callback, folder, time, progress, uidNext](const SelectResult &selectResult) -> KAsync::Job { SinkLog() << "UIDNEXT " << selectResult.uidNext << uidNext; if (selectResult.uidNext == (uidNext + 1)) { @@ -469,7 +468,7 @@ KAsync::Job ImapServerProxy::fetchMessages(const Folder &folder, qint64 ui return KAsync::null(); } - return fetchHeaders(mailboxFromFolder(folder), (uidNext + 1)).then, QList>([this, callback, time, progress](const QList &uidsToFetch){ + return fetchHeaders(mailboxFromFolder(folder), (uidNext + 1)).then>([this, callback, time, progress](const QList &uidsToFetch){ SinkTrace() << "Fetched headers"; SinkTrace() << " Total: " << uidsToFetch.size(); SinkTrace() << " Uids to fetch: " << uidsToFetch; diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp index 392b422..e69d822 100644 --- a/examples/maildirresource/maildirresource.cpp +++ b/examples/maildirresource/maildirresource.cpp @@ -367,7 +367,7 @@ public: KAsync::Job synchronizeWithSource() Q_DECL_OVERRIDE { SinkLog() << " Synchronizing"; - return KAsync::start >([this]() { + return KAsync::start([this]() { KPIM::Maildir maildir(mMaildirPath, true); if (!maildir.isValid(false)) { return KAsync::error(1, "Maildir path doesn't point to a valid maildir: " + mMaildirPath); @@ -402,18 +402,14 @@ public: if (operation == Sink::Operation_Creation) { const auto remoteId = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); SinkTrace() << "Mail created: " << remoteId; - return KAsync::start([=]() -> QByteArray { - return remoteId.toUtf8(); - }); + return KAsync::value(remoteId.toUtf8()); } else if (operation == Sink::Operation_Removal) { SinkTrace() << "Removing a mail: " << oldRemoteId; return KAsync::null(); } else if (operation == Sink::Operation_Modification) { SinkTrace() << "Modifying a mail: " << oldRemoteId; const auto remoteId = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); - return KAsync::start([=]() -> QByteArray { - return remoteId.toUtf8(); - }); + return KAsync::value(remoteId.toUtf8()); } return KAsync::null(); } @@ -427,9 +423,7 @@ public: SinkTrace() << "Creating a new folder: " << path; KPIM::Maildir maildir(path, false); maildir.create(); - return KAsync::start([=]() -> QByteArray { - return path.toUtf8(); - }); + return KAsync::value(path.toUtf8()); } else if (operation == Sink::Operation_Removal) { const auto path = oldRemoteId; SinkTrace() << "Removing a folder: " << path; @@ -438,9 +432,7 @@ public: return KAsync::null(); } else if (operation == Sink::Operation_Modification) { SinkWarning() << "Folder modifications are not implemented"; - return KAsync::start([=]() -> QByteArray { - return oldRemoteId; - }); + return KAsync::value(oldRemoteId); } return KAsync::null(); } diff --git a/examples/mailtransportresource/mailtransportresource.cpp b/examples/mailtransportresource/mailtransportresource.cpp index a729d4d..be4e4e0 100644 --- a/examples/mailtransportresource/mailtransportresource.cpp +++ b/examples/mailtransportresource/mailtransportresource.cpp @@ -124,17 +124,14 @@ public: }); auto job = KAsync::null(); for (const auto &m : toSend) { - job = job.then(send(m, mSettings)).then([this, m]() { + job = job.then(send(m, mSettings)).syncThen([this, m] { auto modifiedMail = ApplicationDomain::Mail(mResourceInstanceIdentifier, m.identifier(), m.revision(), QSharedPointer::create()); modifiedMail.setSent(true); modify(modifiedMail); //TODO copy to a sent mail folder as well }); } - job = job.then([&future]() { - future.setFinished(); - }, - [&future](int errorCode, const QString &errorString) { + job = job.syncThen([&future](const KAsync::Error &) { future.setFinished(); }); job.exec(); diff --git a/sinksh/syntax_modules/sink_sync.cpp b/sinksh/syntax_modules/sink_sync.cpp index 208b869..2ed4cf7 100644 --- a/sinksh/syntax_modules/sink_sync.cpp +++ b/sinksh/syntax_modules/sink_sync.cpp @@ -45,10 +45,10 @@ bool sync(const QStringList &args, State &state) } QTimer::singleShot(0, [query, state]() { - Sink::Store::synchronize(query).then([state]() { + Sink::Store::synchronize(query).syncThen([state]() { state.printLine("Synchronization complete!"); state.commandFinished(); - }).exec(); + }).exec(); }); return true; diff --git a/tests/accountstest.cpp b/tests/accountstest.cpp index 4be8bd6..e0a99c2 100644 --- a/tests/accountstest.cpp +++ b/tests/accountstest.cpp @@ -39,7 +39,7 @@ private slots: account.setProperty("icon", accountIcon); Store::create(account).exec().waitForFinished(); - Store::fetchAll(Query()).then>([&](const QList &accounts) { + Store::fetchAll(Query()).syncThen>([&](const QList &accounts) { QCOMPARE(accounts.size(), 1); auto account = accounts.first(); QCOMPARE(account->getProperty("type").toString(), QString("maildir")); @@ -60,7 +60,7 @@ private slots: Store::create(resource).exec().waitForFinished(); - Store::fetchAll(Query()).then>([&](const QList &resources) { + Store::fetchAll(Query()).syncThen>([&](const QList &resources) { QCOMPARE(resources.size(), 1); auto resource = resources.first(); QCOMPARE(resource->getProperty("type").toString(), QString("sink.mailtransport")); @@ -74,7 +74,7 @@ private slots: identity.setProperty("account", account.identifier()); Store::create(identity).exec().waitForFinished(); - Store::fetchAll(Query()).then>([&](const QList &identities) { + Store::fetchAll(Query()).syncThen>([&](const QList &identities) { QCOMPARE(identities.size(), 1); }) .exec().waitForFinished(); @@ -82,7 +82,7 @@ private slots: Store::remove(resource).exec().waitForFinished(); - Store::fetchAll(Query()).then>([](const QList &resources) { + Store::fetchAll(Query()).syncThen>([](const QList &resources) { QCOMPARE(resources.size(), 0); }) .exec().waitForFinished(); diff --git a/tests/clientapitest.cpp b/tests/clientapitest.cpp index b1e49c4..b5405cf 100644 --- a/tests/clientapitest.cpp +++ b/tests/clientapitest.cpp @@ -70,7 +70,7 @@ public: } resultProvider->initialResultSetComplete(parent); }); - auto job = KAsync::start([query, resultProvider]() {}); + auto job = KAsync::syncStart([query, resultProvider]() {}); mResultProvider = resultProvider; return qMakePair(job, emitter); } @@ -273,7 +273,7 @@ private slots: bool gotValue = false; auto result = Sink::Store::fetchOne(query) - .then([&gotValue](const Sink::ApplicationDomain::Event &event) { gotValue = true; }) + .syncThen([&gotValue](const Sink::ApplicationDomain::Event &event) { gotValue = true; }) .exec(); result.waitForFinished(); QVERIFY(!result.errorCode()); diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp index 72562c3..8636bf6 100644 --- a/tests/dummyresourcebenchmark.cpp +++ b/tests/dummyresourcebenchmark.cpp @@ -44,34 +44,6 @@ private slots: { } - static KAsync::Job waitForCompletion(QList> &futures) - { - auto context = new QObject; - return KAsync::start([futures, context](KAsync::Future &future) { - const auto total = futures.size(); - auto count = QSharedPointer::create(); - int i = 0; - for (KAsync::Future subFuture : futures) { - i++; - if (subFuture.isFinished()) { - *count += 1; - continue; - } - // FIXME bind lifetime all watcher to future (repectively the main job - auto watcher = QSharedPointer>::create(); - QObject::connect(watcher.data(), &KAsync::FutureWatcher::futureReady, [count, total, &future]() { - *count += 1; - if (*count == total) { - future.setFinished(); - } - }); - watcher->setFuture(subFuture); - context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); - } - }) - .then([context]() { delete context; }); - } - // Ensure we can process a command in less than 0.1s void testCommandResponsiveness() { @@ -120,7 +92,7 @@ private slots: event.setProperty("summary", "summaryValue"); waitCondition << Sink::Store::create(event).exec(); } - waitForCompletion(waitCondition).exec().waitForFinished(); + KAsync::waitForCompletion(waitCondition).exec().waitForFinished(); auto appendTime = time.elapsed(); // Ensure everything is processed diff --git a/tests/mailsynctest.cpp b/tests/mailsynctest.cpp index 4b797c8..9c57f0a 100644 --- a/tests/mailsynctest.cpp +++ b/tests/mailsynctest.cpp @@ -69,7 +69,7 @@ void MailSyncTest::testListFolders() //First figure out how many folders we have by default { auto job = Store::fetchAll(Query()) - .then>([&](const QList &folders) { + .syncThen>([&](const QList &folders) { QStringList names; for (const auto &folder : folders) { names << folder->getName(); @@ -88,7 +88,7 @@ void MailSyncTest::testListFolders() VERIFYEXEC(Store::synchronize(query)); ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); - auto job = Store::fetchAll(query).then>([=](const QList &folders) { + auto job = Store::fetchAll(query).syncThen>([=](const QList &folders) { QStringList names; QHash specialPurposeFolders; for (const auto &folder : folders) { @@ -130,7 +130,7 @@ void MailSyncTest::testListNewFolder() VERIFYEXEC(Store::synchronize(query)); ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); - auto job = Store::fetchAll(query).then>([](const QList &folders) { + auto job = Store::fetchAll(query).syncThen>([](const QList &folders) { QStringList names; for (const auto &folder : folders) { names << folder->getName(); @@ -155,7 +155,7 @@ void MailSyncTest::testListRemovedFolder() VERIFYEXEC(Store::synchronize(query)); ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); - auto job = Store::fetchAll(query).then>([](const QList &folders) { + auto job = Store::fetchAll(query).syncThen>([](const QList &folders) { QStringList names; for (const auto &folder : folders) { names << folder->getName(); @@ -180,7 +180,7 @@ void MailSyncTest::testListFolderHierarchy() VERIFYEXEC(Store::synchronize(query)); ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); - auto job = Store::fetchAll(query).then>([=](const QList &folders) { + auto job = Store::fetchAll(query).syncThen>([=](const QList &folders) { QHash map; for (const auto &folder : folders) { map.insert(folder->getName(), folder); @@ -223,7 +223,7 @@ void MailSyncTest::testListNewSubFolder() VERIFYEXEC(Store::synchronize(query)); ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); - auto job = Store::fetchAll(query).then>([](const QList &folders) { + auto job = Store::fetchAll(query).syncThen>([](const QList &folders) { QStringList names; for (const auto &folder : folders) { names << folder->getName(); @@ -251,7 +251,7 @@ void MailSyncTest::testListRemovedSubFolder() VERIFYEXEC(Store::synchronize(query)); ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); - auto job = Store::fetchAll(query).then>([](const QList &folders) { + auto job = Store::fetchAll(query).syncThen>([](const QList &folders) { QStringList names; for (const auto &folder : folders) { names << folder->getName(); @@ -271,7 +271,7 @@ void MailSyncTest::testListMails() VERIFYEXEC(Store::synchronize(query)); ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); - auto job = Store::fetchAll(query).then>([](const QList &mails) { + auto job = Store::fetchAll(query).syncThen>([](const QList &mails) { QCOMPARE(mails.size(), 1); QVERIFY(mails.first()->getSubject().startsWith(QString("[Nepomuk] Jenkins build is still unstable"))); const auto data = mails.first()->getMimeMessage(); @@ -300,7 +300,7 @@ void MailSyncTest::testResyncMails() VERIFYEXEC(Store::synchronize(query)); ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); - auto job = Store::fetchAll(query).then>([](const QList &mails) { + auto job = Store::fetchAll(query).syncThen>([](const QList &mails) { QCOMPARE(mails.size(), 1); }); VERIFYEXEC(job); @@ -325,7 +325,7 @@ void MailSyncTest::testFetchNewRemovedMessages() ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); { - auto job = Store::fetchAll(query).then>([](const QList &mails) { + auto job = Store::fetchAll(query).syncThen>([](const QList &mails) { QCOMPARE(mails.size(), 2); }); VERIFYEXEC(job); @@ -337,7 +337,7 @@ void MailSyncTest::testFetchNewRemovedMessages() ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); { - auto job = Store::fetchAll(query).then>([](const QList &mails) { + auto job = Store::fetchAll(query).syncThen>([](const QList &mails) { QCOMPARE(mails.size(), 1); }); VERIFYEXEC(job); diff --git a/tests/mailtest.cpp b/tests/mailtest.cpp index 908fb84..925fb70 100644 --- a/tests/mailtest.cpp +++ b/tests/mailtest.cpp @@ -66,7 +66,7 @@ void MailTest::testCreateModifyDeleteFolder() //First figure out how many folders we have by default { auto job = Store::fetchAll(Query()) - .then>([&](const QList &folders) { + .syncThen>([&](const QList &folders) { baseCount = folders.size(); }); VERIFYEXEC(job); @@ -83,7 +83,7 @@ void MailTest::testCreateModifyDeleteFolder() VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); { auto job = Store::fetchAll(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name)) - .then>([=](const QList &folders) { + .syncThen>([=](const QList &folders) { QCOMPARE(folders.size(), baseCount + 1); QHash foldersByName; for (const auto &folder : folders) { @@ -109,7 +109,7 @@ void MailTest::testCreateModifyDeleteFolder() VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); { auto job = Store::fetchAll(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name)) - .then>([=](const QList &folders) { + .syncThen>([=](const QList &folders) { QCOMPARE(folders.size(), baseCount + 1); QHash foldersByName; for (const auto &folder : folders) { @@ -130,7 +130,7 @@ void MailTest::testCreateModifyDeleteFolder() VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); { auto job = Store::fetchAll(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name)) - .then>([=](const QList &folders) { + .syncThen>([=](const QList &folders) { QCOMPARE(folders.size(), baseCount); }); VERIFYEXEC(job); @@ -160,7 +160,7 @@ void MailTest::testCreateModifyDeleteMail() VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); { auto job = Store::fetchAll(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) - .then>([=](const QList &mails) { + .syncThen>([=](const QList &mails) { QCOMPARE(mails.size(), 1); auto mail = *mails.first(); QCOMPARE(mail.getSubject(), subject); @@ -189,7 +189,7 @@ void MailTest::testCreateModifyDeleteMail() VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); { auto job = Store::fetchAll(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) - .then>([=](const QList &mails) { + .syncThen>([=](const QList &mails) { QCOMPARE(mails.size(), 1); auto mail = *mails.first(); QCOMPARE(mail.getSubject(), subject2); @@ -211,7 +211,7 @@ void MailTest::testCreateModifyDeleteMail() VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); { auto job = Store::fetchAll(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name)) - .then>([=](const QList &mails) { + .syncThen>([=](const QList &mails) { QCOMPARE(mails.size(), 0); }); VERIFYEXEC(job); @@ -247,7 +247,7 @@ void MailTest::testMoveMail() Mail modifiedMail; { auto job = Store::fetchAll(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) - .then>([=, &modifiedMail](const QList &mails) { + .syncThen>([=, &modifiedMail](const QList &mails) { QCOMPARE(mails.size(), 1); auto mail = *mails.first(); modifiedMail = mail; @@ -266,7 +266,7 @@ void MailTest::testMoveMail() VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); { auto job = Store::fetchAll(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) - .then>([=](const QList &mails) { + .syncThen>([=](const QList &mails) { QCOMPARE(mails.size(), 1); auto mail = *mails.first(); QCOMPARE(mail.getFolder(), folder1.identifier()); @@ -299,7 +299,7 @@ void MailTest::testMarkMailAsRead() auto job = Store::fetchAll(Query::ResourceFilter(mResourceInstanceIdentifier) + Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name)) - .then, QList>([this](const QList &mails) { + .then>([this](const QList &mails) { ASYNCCOMPARE(mails.size(), 1); auto mail = mails.first(); mail->setUnread(false); @@ -316,7 +316,7 @@ void MailTest::testMarkMailAsRead() << Mail::Subject::name << Mail::MimeMessage::name << Mail::Unread::name)) - .then, QList>([](const QList &mails) { + .then>([](const QList &mails) { ASYNCCOMPARE(mails.size(), 1); auto mail = mails.first(); ASYNCVERIFY(!mail->getSubject().isEmpty()); diff --git a/tests/resourcecommunicationtest.cpp b/tests/resourcecommunicationtest.cpp index 1530f63..201db53 100644 --- a/tests/resourcecommunicationtest.cpp +++ b/tests/resourcecommunicationtest.cpp @@ -51,13 +51,14 @@ private slots: int errors = 0; for (int i = 0; i < count; i++) { auto result = resourceAccess.sendCommand(Sink::Commands::PingCommand) - .then([&complete]() { complete++; }, - [&errors, &complete](int error, const QString &msg) { - qWarning() << msg; - errors++; - complete++; - }) - .exec(); + .syncThen([&resourceAccess, &errors, &complete](const KAsync::Error &error) { + complete++; + if (error) { + qWarning() << error.errorMessage; + errors++; + } + }) + .exec(); } QTRY_COMPARE(complete, count); QVERIFY(!errors); @@ -76,13 +77,12 @@ private slots: int errors = 0; for (int i = 0; i < count; i++) { resourceAccess.sendCommand(Sink::Commands::PingCommand) - .then([&complete]() { complete++; }, - [&errors, &complete](int error, const QString &msg) { - qWarning() << msg; + .syncThen([&resourceAccess, &errors, &complete](const KAsync::Error &error) { + complete++; + if (error) { + qWarning() << error.errorMessage; errors++; - complete++; - }) - .then([&resourceAccess]() { + } resourceAccess.close(); resourceAccess.open(); }) @@ -104,7 +104,7 @@ private slots: auto resourceAccess = Sink::ResourceAccessFactory::instance().getAccess(resourceIdentifier, ""); weakRef = resourceAccess.toWeakRef(); resourceAccess->open(); - resourceAccess->sendCommand(Sink::Commands::PingCommand).then([resourceAccess]() { qDebug() << "Pind complete"; }).exec(); + resourceAccess->sendCommand(Sink::Commands::PingCommand).syncThen([resourceAccess]() { qDebug() << "Ping complete"; }).exec(); } QVERIFY(weakRef.toStrongRef()); QTRY_VERIFY(!weakRef.toStrongRef()); -- cgit v1.2.3