From 26816c21f60450e461a5b6ef4ef740f6070ce278 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 27 Jul 2016 02:26:47 +0200 Subject: Ported to the kasync revamp --- common/changereplay.cpp | 58 +++++++++++--------- common/genericresource.cpp | 134 ++++++++++++++++++++++----------------------- common/listener.cpp | 18 +++--- common/messagequeue.cpp | 37 +------------ common/pipeline.cpp | 6 +- common/queryrunner.cpp | 4 +- common/resourceaccess.cpp | 117 ++++++++++++++++++++------------------- common/resourcecontrol.cpp | 29 +++++----- common/resourcefacade.cpp | 6 +- common/sourcewriteback.cpp | 14 ++--- common/store.cpp | 68 +++++++++++------------ common/synchronizer.cpp | 2 +- common/test.cpp | 2 +- 13 files changed, 233 insertions(+), 262 deletions(-) (limited to 'common') diff --git a/common/changereplay.cpp b/common/changereplay.cpp index fbd556f..e3b7158 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp @@ -72,7 +72,7 @@ KAsync::Job ChangeReplay::replayNextRevision() { auto lastReplayedRevision = QSharedPointer::create(0); auto topRevision = QSharedPointer::create(0); - return KAsync::start([this, lastReplayedRevision, topRevision]() { + return KAsync::syncStart([this, lastReplayedRevision, topRevision]() { mReplayInProgress = true; mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { SinkWarning() << error.message; @@ -90,11 +90,9 @@ KAsync::Job ChangeReplay::replayNextRevision() SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; }) .then(KAsync::dowhile( - [this, lastReplayedRevision, topRevision](KAsync::Future &future) { + [this, lastReplayedRevision, topRevision]() -> KAsync::Job { if (*lastReplayedRevision >= *topRevision) { - future.setValue(false); - future.setFinished(); - return; + return KAsync::value(KAsync::Break); } qint64 revision = *lastReplayedRevision + 1; @@ -109,12 +107,15 @@ KAsync::Job ChangeReplay::replayNextRevision() [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { SinkTrace() << "Replaying " << key; if (canReplay(type, key, value)) { - replayJob = replay(type, key, value).then([this, revision, lastReplayedRevision]() { - recordReplayedRevision(revision); - *lastReplayedRevision = revision; - }, - [revision](int, QString) { - SinkTrace() << "Change replay failed" << revision; + replayJob = replay(type, key, value).then([this, revision, lastReplayedRevision](const KAsync::Error &error) { + if (error) { + SinkTrace() << "Change replay failed" << revision; + return KAsync::error(error); + } else { + recordReplayedRevision(revision); + *lastReplayedRevision = revision; + } + return KAsync::null(); }); exitLoop = true; } else { @@ -128,23 +129,26 @@ KAsync::Job ChangeReplay::replayNextRevision() } revision++; } - replayJob.then([this, revision, lastReplayedRevision, topRevision, &future]() { - SinkTrace() << "Replayed until " << revision; - recordReplayedRevision(*lastReplayedRevision); - QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() { - future.setValue((*lastReplayedRevision < *topRevision)); - future.setFinished(); - }); - }, - [this, revision, &future](int, QString) { - SinkTrace() << "Change replay failed" << revision; - //We're probably not online or so, so postpone retrying - future.setValue(false); - future.setFinished(); - }).exec(); - + return replayJob.then([this, revision, lastReplayedRevision, topRevision](const KAsync::Error &error) ->KAsync::Job { + if (error) { + SinkTrace() << "Change replay failed" << revision; + //We're probably not online or so, so postpone retrying + return KAsync::value(KAsync::Break); + } else { + SinkTrace() << "Replayed until " << revision; + recordReplayedRevision(*lastReplayedRevision); + if (*lastReplayedRevision < *topRevision) { + return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); + } else { + return KAsync::value(KAsync::Break); + } + } + //We shouldn't ever get here + Q_ASSERT(false); + return KAsync::value(KAsync::Break); + }); })) - .then([this, lastReplayedRevision]() { + .syncThen([this, lastReplayedRevision]() { recordReplayedRevision(*lastReplayedRevision); mMainStoreTransaction.abort(); if (allChangesReplayed()) { diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 7136882..f5b1775 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -100,7 +100,7 @@ private slots: } mProcessingLock = true; auto job = processPipeline() - .then([this]() { + .syncThen([this]() { mProcessingLock = false; if (messagesToProcessAvailable()) { process(); @@ -122,7 +122,8 @@ private slots: return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); case Sink::Commands::InspectionCommand: if (mInspect) { - return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([]() { return -1; }); + return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()) + .syncThen([]() { return -1; }); } else { return KAsync::error(-1, "Missing inspection command."); } @@ -131,7 +132,7 @@ private slots: } } - KAsync::Job processQueuedCommand(const QByteArray &data) + KAsync::Job processQueuedCommand(const QByteArray &data) { flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { @@ -143,13 +144,13 @@ private slots: SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); return processQueuedCommand(queuedCommand) .then( - [this, commandId](qint64 createdRevision) -> qint64 { + [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job { + if (error) { + SinkWarning() << "Error while processing queue command: " << error.errorMessage; + return KAsync::error(error); + } SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); - return createdRevision; - }, - [](int errorCode, QString errorMessage) { - // FIXME propagate error, we didn't handle it - SinkWarning() << "Error while processing queue command: " << errorMessage; + return KAsync::value(createdRevision); }); } @@ -157,31 +158,31 @@ private slots: KAsync::Job processQueue(MessageQueue *queue) { auto time = QSharedPointer::create(); - return KAsync::start([this]() { mPipeline->startTransaction(); }) - .then(KAsync::dowhile([queue]() { return !queue->isEmpty(); }, - [this, queue, time](KAsync::Future &future) { - queue->dequeueBatch(sBatchSize, - [this, time](const QByteArray &data) { - time->start(); - return KAsync::start([this, data, time](KAsync::Future &future) { - processQueuedCommand(data) - .then([&future, this, time](qint64 createdRevision) { - SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); - future.setFinished(); - }) - .exec(); - }); - }) - .then([&future, queue]() { future.setFinished(); }, - [&future](int i, QString error) { - if (i != MessageQueue::ErrorCodes::NoMessageFound) { - SinkWarning() << "Error while getting message from messagequeue: " << error; + return KAsync::syncStart([this]() { mPipeline->startTransaction(); }) + .then(KAsync::dowhile( + [this, queue, time]() -> KAsync::Job { + return queue->dequeueBatch(sBatchSize, + [this, time](const QByteArray &data) -> KAsync::Job { + time->start(); + return processQueuedCommand(data) + .syncThen([this, time](qint64 createdRevision) { + SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); + }); + }) + .then([queue](const KAsync::Error &error) { + if (error) { + if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { + SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; + } } - future.setFinished(); - }) - .exec(); + if (queue->isEmpty()) { + return KAsync::value(KAsync::Break); + } else { + return KAsync::value(KAsync::Continue); + } + }); })) - .then([this]() { mPipeline->commit(); }); + .syncThen([this](const KAsync::Error &) { mPipeline->commit(); }); } KAsync::Job processPipeline() @@ -198,18 +199,20 @@ private slots: // Go through all message queues auto it = QSharedPointer>::create(mCommandQueues); - return KAsync::dowhile([it]() { return it->hasNext(); }, - [it, this](KAsync::Future &future) { + return KAsync::dowhile( + [it, this]() { auto time = QSharedPointer::create(); time->start(); auto queue = it->next(); - processQueue(queue) - .then([this, &future, time]() { + return processQueue(queue) + .syncThen([this, time, it]() { SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); - future.setFinished(); - }) - .exec(); + if (it->hasNext()) { + return KAsync::Continue; + } + return KAsync::Break; + }); }); } @@ -251,22 +254,19 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra s >> expectedValue; inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) .then( - [=]() { - Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; - Sink::Notification n; - n.type = Sink::Notification::Inspection; - n.id = inspectionId; - n.code = Sink::Notification::Success; - emit notify(n); - }, - [=](int code, const QString &message) { - Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message; + [=](const KAsync::Error &error) { Sink::Notification n; n.type = Sink::Notification::Inspection; - n.message = message; n.id = inspectionId; - n.code = Sink::Notification::Failure; + if (error) { + Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; + n.code = Sink::Notification::Failure; + } else { + Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; + n.code = Sink::Notification::Success; + } emit notify(n); + return KAsync::null(); }) .exec(); return KAsync::null(); @@ -420,7 +420,7 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) KAsync::Job GenericResource::synchronizeWithSource() { - return KAsync::start([this](KAsync::Future &future) { + return KAsync::start([this]() { Sink::Notification n; n.id = "sync"; @@ -432,23 +432,21 @@ KAsync::Job GenericResource::synchronizeWithSource() SinkLog() << " Synchronizing"; // Changereplay would deadlock otherwise when trying to open the synchronization store enableChangeReplay(false); - mSynchronizer->synchronize() - .then([this, &future]() { - SinkLog() << "Done Synchronizing"; - Sink::Notification n; - n.id = "sync"; - n.type = Sink::Notification::Status; - n.message = "Synchronization has ended."; - n.code = Sink::ApplicationDomain::ConnectedStatus; - emit notify(n); - - enableChangeReplay(true); - future.setFinished(); - }, [this, &future](int errorCode, const QString &error) { + return mSynchronizer->synchronize() + .then([this](const KAsync::Error &error) { enableChangeReplay(true); - future.setError(errorCode, error); - }) - .exec(); + if (!error) { + SinkLog() << "Done Synchronizing"; + Sink::Notification n; + n.id = "sync"; + n.type = Sink::Notification::Status; + n.message = "Synchronization has ended."; + n.code = Sink::ApplicationDomain::ConnectedStatus; + emit notify(n); + return KAsync::null(); + } + return KAsync::error(error); + }); }); } diff --git a/common/listener.cpp b/common/listener.cpp index a051293..027d9ae 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -248,13 +248,17 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c if (buffer->localSync()) { job = job.then(loadResource().processAllMessages()); } - job.then([callback, timer]() { - SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); - callback(true); - }, [callback](int errorCode, const QString &msg) { - SinkWarning() << "Sync failed: " << msg; - callback(false); - }) + job.then([callback, timer](const KAsync::Error &error) { + if (error) { + SinkWarning() << "Sync failed: " << error.errorMessage; + callback(false); + return KAsync::error(error); + } else { + SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); + callback(true); + return KAsync::null(); + } + }) .exec(); return; } else { diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 3567a10..28eacb7 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp @@ -5,37 +5,6 @@ SINK_DEBUG_AREA("messagequeue") -static KAsync::Job waitForCompletion(QList> &futures) -{ - auto context = new QObject; - return KAsync::start([futures, context](KAsync::Future &future) { - const auto total = futures.size(); - auto count = QSharedPointer::create(); - int i = 0; - for (KAsync::Future subFuture : futures) { - i++; - if (subFuture.isFinished()) { - *count += 1; - continue; - } - // FIXME bind lifetime all watcher to future (repectively the main job - auto watcher = QSharedPointer>::create(); - QObject::connect(watcher.data(), &KAsync::FutureWatcher::futureReady, [count, total, &future]() { - *count += 1; - if (*count == total) { - future.setFinished(); - } - }); - watcher->setFuture(subFuture); - context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); - } - if (*count == total) { - future.setFinished(); - } - }) - .then([context]() { delete context; }); -} - MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) { } @@ -101,7 +70,7 @@ void MessageQueue::dequeue(const std::function([&value, resultHandler](KAsync::Future &future) { resultHandler(const_cast(static_cast(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); }); - }).then([]() {}, [errorHandler](int error, const QString &errorString) { errorHandler(Error("messagequeue", error, errorString.toLatin1())); }).exec(); + }).onError([errorHandler](const KAsync::Error &error) { errorHandler(Error("messagequeue", error.errorCode, error.errorMessage.toLatin1())); }).exec(); } KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler) @@ -135,8 +104,8 @@ KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::functi }); // Trace() << "Waiting on " << waitCondition.size() << " results"; - ::waitForCompletion(waitCondition) - .then([this, resultCount, &future]() { + KAsync::waitForCompletion(waitCondition) + .syncThen([this, resultCount, &future]() { processRemovals(); if (*resultCount == 0) { future.setError(static_cast(ErrorCodes::NoMessageFound), "No message found"); diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 1d45340..ce864f7 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -233,7 +233,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) d->storeNewRevision(newRevision, fbb, bufferType, key); - return KAsync::start([newRevision]() { return newRevision; }); + return KAsync::value(newRevision); } KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) @@ -346,7 +346,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); d->storeNewRevision(newRevision, fbb, bufferType, key); - return KAsync::start([newRevision]() { return newRevision; }); + return KAsync::value(newRevision); } KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) @@ -433,7 +433,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) processor->deletedEntity(key, newRevision, *current, d->transaction); } - return KAsync::start([newRevision]() { return newRevision; }); + return KAsync::value(newRevision); } void Pipeline::cleanupRevision(qint64 revision) diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 2e2e96d..052db39 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -86,7 +86,7 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); return newRevisionAndReplayedEntities; }) - .template then>([=](const QPair &newRevisionAndReplayedEntities) { + .template syncThen>([=](const QPair &newRevisionAndReplayedEntities) { mOffset[parentId] += newRevisionAndReplayedEntities.second; // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. if (query.liveQuery) { @@ -110,7 +110,7 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); return newRevisionAndReplayedEntities; }) - .template then >([query, this, resultProvider](const QPair &newRevisionAndReplayedEntities) { + .template syncThen >([query, this, resultProvider](const QPair &newRevisionAndReplayedEntities) { // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); resultProvider->setRevision(newRevisionAndReplayedEntities.first); diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 7b4d839..364616c 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -159,67 +159,65 @@ KAsync::Job ResourceAccess::Private::tryToConnect() // We may have a socket from the last connection leftover socket.reset(); auto counter = QSharedPointer::create(0); - return KAsync::dowhile([this]() -> bool { return !socket; }, - [this, counter](KAsync::Future &future) { + return KAsync::dowhile( + [this, counter]() { SinkTrace() << "Loop"; - connectToServer(resourceInstanceIdentifier) - .then>( - [this, &future](const QSharedPointer &s) { - Q_ASSERT(s); - socket = s; - future.setFinished(); - }, - [&future, counter, this](int errorCode, const QString &errorString) { - static int waitTime = 10; - static int timeout = 500; - static int maxRetries = timeout / waitTime; - if (*counter > maxRetries) { - SinkTrace() << "Giving up"; - future.setError(-1, "Failed to connect to socket"); + return connectToServer(resourceInstanceIdentifier) + .then>( + [this, counter](const KAsync::Error &error, const QSharedPointer &s) { + if (error) { + static int waitTime = 10; + static int timeout = 500; + static int maxRetries = timeout / waitTime; + if (*counter > maxRetries) { + SinkTrace() << "Giving up"; + return KAsync::error("Failed to connect to socket"); + } else { + *counter = *counter + 1; + return KAsync::wait(waitTime).then(KAsync::value(KAsync::Continue)); + } } else { - KAsync::wait(waitTime).then([&future]() { future.setFinished(); }).exec(); + Q_ASSERT(s); + socket = s; + return KAsync::value(KAsync::Break); } - *counter = *counter + 1; - }) - .exec(); + }); }); } KAsync::Job ResourceAccess::Private::initializeSocket() { - return KAsync::start([this](KAsync::Future &future) { + return KAsync::start([this] { SinkTrace() << "Trying to connect"; - connectToServer(resourceInstanceIdentifier) + return connectToServer(resourceInstanceIdentifier) .then>( - [this, &future](const QSharedPointer &s) { - SinkTrace() << "Connected to resource, without having to start it."; - Q_ASSERT(s); - socket = s; - future.setFinished(); - }, - [this, &future](int errorCode, const QString &errorString) { - SinkTrace() << "Failed to connect, starting resource"; - // We failed to connect, so let's start the resource - QStringList args; - if (Sink::Test::testModeEnabled()) { - args << "--test"; - } - args << resourceInstanceIdentifier << resourceName; - qint64 pid = 0; - if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { - SinkTrace() << "Started resource " << pid; - tryToConnect() - .then([&future]() { future.setFinished(); }, - [this, &future](int errorCode, const QString &errorString) { - SinkWarning() << "Failed to connect to started resource"; - future.setError(errorCode, errorString); - }) - .exec(); + [this](const KAsync::Error &error, const QSharedPointer &s) { + if (error) { + SinkTrace() << "Failed to connect, starting resource"; + // We failed to connect, so let's start the resource + QStringList args; + if (Sink::Test::testModeEnabled()) { + args << "--test"; + } + args << resourceInstanceIdentifier << resourceName; + qint64 pid = 0; + if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { + SinkTrace() << "Started resource " << pid; + return tryToConnect() + .onError([this](const KAsync::Error &error) { + SinkWarning() << "Failed to connect to started resource"; + }); + } else { + SinkWarning() << "Failed to start resource"; + } + return KAsync::null(); } else { - SinkWarning() << "Failed to start resource"; + SinkTrace() << "Connected to resource, without having to start it."; + Q_ASSERT(s); + socket = s; + return KAsync::null(); } - }) - .exec(); + }); }); } @@ -383,17 +381,18 @@ void ResourceAccess::open() d->openingSocket = true; d->initializeSocket() .then( - [this, time]() { - SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); + [this, time](const KAsync::Error &error) { d->openingSocket = false; - QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); - QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); - QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); - connected(); - }, - [this](int error, const QString &errorString) { - d->openingSocket = false; - SinkWarning() << "Failed to initialize socket " << errorString; + if (error) { + SinkWarning() << "Failed to initialize socket " << error.errorMessage; + } else { + SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); + QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); + QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); + QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); + connected(); + } + return KAsync::null(); }) .exec(); } diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp index 7d092a4..f509318 100644 --- a/common/resourcecontrol.cpp +++ b/common/resourcecontrol.cpp @@ -41,22 +41,22 @@ KAsync::Job ResourceControl::shutdown(const QByteArray &identifier) time->start(); return ResourceAccess::connectToServer(identifier) .then>( - [identifier, time](QSharedPointer socket, KAsync::Future &future) { + [identifier, time](const KAsync::Error &error, QSharedPointer socket) { + if (error) { + SinkTrace() << "Resource is already closed."; + // Resource isn't started, nothing to shutdown + return KAsync::null(); + } // We can't currently reuse the socket socket->close(); auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); resourceAccess->open(); - resourceAccess->sendCommand(Sink::Commands::ShutdownCommand) - .then([&future, resourceAccess, time]() { + return resourceAccess->sendCommand(Sink::Commands::ShutdownCommand) + .addToContext(resourceAccess) + .syncThen([resourceAccess, time]() { resourceAccess->close(); SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed()); - future.setFinished(); - }) - .exec(); - }, - [](int, const QString &) { - SinkTrace() << "Resource is already closed."; - // Resource isn't started, nothing to shutdown + }); }); } @@ -67,18 +67,19 @@ KAsync::Job ResourceControl::start(const QByteArray &identifier) time->start(); auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); resourceAccess->open(); - return resourceAccess->sendCommand(Sink::Commands::PingCommand).then([resourceAccess, time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); }); + return resourceAccess->sendCommand(Sink::Commands::PingCommand).addToContext(resourceAccess).syncThen([time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); }); } KAsync::Job ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier) { SinkTrace() << "flushMessageQueue" << resourceIdentifier; - return KAsync::iterate(resourceIdentifier) - .template each([](const QByteArray &resource, KAsync::Future &future) { + return KAsync::value(resourceIdentifier) + .template each([](const QByteArray &resource) { SinkTrace() << "Flushing message queue " << resource; auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); resourceAccess->open(); - resourceAccess->synchronizeResource(false, true).then([&future, resourceAccess]() { future.setFinished(); }).exec(); + return resourceAccess->synchronizeResource(false, true) + .addToContext(resourceAccess); }); } diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index bf4239d..1c56fe5 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp @@ -167,7 +167,7 @@ template KAsync::Job LocalStorageFacade::create(const DomainType &domainObject) { auto configStoreIdentifier = mIdentifier; - return KAsync::start([domainObject, configStoreIdentifier]() { + return KAsync::syncStart([domainObject, configStoreIdentifier]() { const QByteArray type = domainObject.getProperty("type").toByteArray(); const QByteArray providedIdentifier = domainObject.identifier().isEmpty() ? domainObject.getProperty("identifier").toByteArray() : domainObject.identifier(); const QByteArray identifier = providedIdentifier.isEmpty() ? ResourceConfig::newIdentifier(type) : providedIdentifier; @@ -192,7 +192,7 @@ template KAsync::Job LocalStorageFacade::modify(const DomainType &domainObject) { auto configStoreIdentifier = mIdentifier; - return KAsync::start([domainObject, configStoreIdentifier]() { + return KAsync::syncStart([domainObject, configStoreIdentifier]() { const QByteArray identifier = domainObject.identifier(); if (identifier.isEmpty()) { SinkWarning() << "We need an \"identifier\" property to identify the entity to configure."; @@ -220,7 +220,7 @@ template KAsync::Job LocalStorageFacade::remove(const DomainType &domainObject) { auto configStoreIdentifier = mIdentifier; - return KAsync::start([domainObject, configStoreIdentifier]() { + return KAsync::syncStart([domainObject, configStoreIdentifier]() { const QByteArray identifier = domainObject.identifier(); if (identifier.isEmpty()) { SinkWarning() << "We need an \"identifier\" property to identify the entity to configure"; diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp index fe996cb..702d8e3 100644 --- a/common/sourcewriteback.cpp +++ b/common/sourcewriteback.cpp @@ -106,7 +106,7 @@ KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArr job = replay(mail, operation, oldRemoteId, modifiedProperties); } - return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { + return job.syncThen([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { if (operation == Sink::Operation_Creation) { SinkTrace() << "Replayed creation with remote id: " << remoteId; if (remoteId.isEmpty()) { @@ -127,13 +127,11 @@ KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArr } else { SinkError() << "Unkown operation" << operation; } - - mSyncStore.clear(); - mEntityStore.clear(); - mTransaction.abort(); - mSyncTransaction.commit(); - }, [this](int errorCode, const QString &errorMessage) { - SinkWarning() << "Failed to replay change: " << errorMessage; + }) + .syncThen([this](const KAsync::Error &error) { + if (error) { + SinkWarning() << "Failed to replay change: " << error.errorMessage; + } mSyncStore.clear(); mEntityStore.clear(); mTransaction.abort(); diff --git a/common/store.cpp b/common/store.cpp index 07f41f8..c01d220 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -39,6 +39,8 @@ SINK_DEBUG_AREA("store") Q_DECLARE_METATYPE(QSharedPointer>) +Q_DECLARE_METATYPE(QSharedPointer); +Q_DECLARE_METATYPE(std::shared_ptr); namespace Sink { @@ -169,12 +171,10 @@ QSharedPointer Store::loadModel(Query query) result.first.exec(); } - KAsync::iterate(resources.keys()) - .template each([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier, KAsync::Future &future) { + KAsync::value(resources.keys()) + .template each([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier) { const auto resourceType = resources.value(resourceInstanceIdentifier); - queryResource(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter).template then([&future]() { - future.setFinished(); - }).exec(); + return queryResource(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter); }) .exec(); model->fetchMore(QModelIndex()); @@ -201,7 +201,7 @@ KAsync::Job Store::create(const DomainType &domainObject) { // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); - return facade->create(domainObject).template then([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to create"; }); + return facade->create(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create"; }); } template @@ -209,7 +209,7 @@ KAsync::Job Store::modify(const DomainType &domainObject) { // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); - return facade->modify(domainObject).template then([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to modify"; }); + return facade->modify(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; }); } template @@ -217,7 +217,7 @@ KAsync::Job Store::remove(const DomainType &domainObject) { // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); - return facade->remove(domainObject).template then([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to remove"; }); + return facade->remove(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove"; }); } KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) @@ -231,6 +231,7 @@ KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); resourceAccess->open(); return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) + .addToContext(resourceAccess) .then([resourceAccess](KAsync::Future &future) { if (resourceAccess->isReady()) { //Wait for the resource shutdown @@ -243,8 +244,8 @@ KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) future.setFinished(); } }) - .then([resourceAccess, time]() { - SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); + .syncThen([time]() { + SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); }); } @@ -253,41 +254,38 @@ KAsync::Job Store::synchronize(const Sink::Query &query) SinkTrace() << "synchronize" << query.resources; auto resources = getResources(query.resources, query.accounts).keys(); //FIXME only necessary because each doesn't propagate errors - auto error = new bool; - return KAsync::iterate(resources) - .template each([query, error](const QByteArray &resource, KAsync::Future &future) { + auto errorFlag = new bool; + return KAsync::value(resources) + .template each([query, errorFlag](const QByteArray &resource) { SinkTrace() << "Synchronizing " << resource; auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); resourceAccess->open(); - resourceAccess->synchronizeResource(true, false).then([resourceAccess, &future]() {SinkTrace() << "synced."; future.setFinished(); }, - [&future, error](int errorCode, QString msg) { *error = true; SinkWarning() << "Error during sync."; future.setError(errorCode, msg); }).exec(); - }).then([error](KAsync::Future &future) { - if (*error) { - future.setError(1, "Error during sync."); - } else { - future.setFinished(); + return resourceAccess->synchronizeResource(true, false) + .addToContext(resourceAccess) + .then([errorFlag](const KAsync::Error &error) { + if (error) { + *errorFlag = true; + SinkWarning() << "Error during sync."; + return KAsync::error(error); + } + SinkTrace() << "synced."; + return KAsync::null(); + }); + }) + .then([errorFlag]() { + if (*errorFlag) { + return KAsync::error("Error during sync."); } - delete error; + delete errorFlag; + return KAsync::null(); }); } template KAsync::Job Store::fetchOne(const Sink::Query &query) { - return KAsync::start([query](KAsync::Future &future) { - // FIXME We could do this more elegantly if composed jobs would have the correct type (In that case we'd simply return the value from then continuation, and could avoid the - // outer job entirely) - fetch(query, 1) - .template then>( - [&future](const QList &list) { - future.setValue(*list.first()); - future.setFinished(); - }, - [&future](int errorCode, const QString &errorMessage) { - future.setError(errorCode, errorMessage); - future.setFinished(); - }) - .exec(); + return fetch(query, 1).template then>([](const QList &list) { + return KAsync::value(*list.first()); }); } diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 2d4fb8d..15a06e7 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -244,7 +244,7 @@ KAsync::Job Synchronizer::synchronize() SinkTrace() << "Synchronizing"; mSyncInProgress = true; mMessageQueue->startTransaction(); - return synchronizeWithSource().then([this]() { + return synchronizeWithSource().syncThen([this]() { mSyncStore.clear(); mEntityStore.clear(); mMessageQueue->commit(); diff --git a/common/test.cpp b/common/test.cpp index 5b4c899..1a8e11d 100644 --- a/common/test.cpp +++ b/common/test.cpp @@ -156,7 +156,7 @@ public: } resultProvider->initialResultSetComplete(parent); }); - auto job = KAsync::start([query, resultProvider]() {}); + auto job = KAsync::syncStart([query, resultProvider]() {}); return qMakePair(job, emitter); } -- cgit v1.2.3