From d417f01e2eebeedfaae76b40667372bd0fb21fea Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 18 Jan 2015 14:29:08 +0100 Subject: Use jobs in queries, sync works again. --- common/clientapi.h | 26 +++++++++++-------- common/pipeline.cpp | 10 ++++++-- common/test/clientapitest.cpp | 14 ++++++----- dummyresource/facade.cpp | 23 +++++++++++------ dummyresource/facade.h | 4 +-- dummyresource/resourcefactory.cpp | 4 ++- tests/dummyresourcetest.cpp | 53 +++++++++++++++++++++++---------------- 7 files changed, 84 insertions(+), 50 deletions(-) diff --git a/common/clientapi.h b/common/clientapi.h index 2f1c127..dd11a0d 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -165,7 +165,7 @@ namespace Akonadi2 { /** * Standardized Domain Types * - * The don't adhere to any standard and can be freely extended + * They don't adhere to any standard and can be freely extended * Their sole purpose is providing a standardized interface to access data. * * This is necessary to decouple resource-backends from application domain containers (otherwise each resource would have to provide a faceade for each application domain container). @@ -297,6 +297,7 @@ using namespace async; class Query { public: + Query() : syncOnDemand(true) {} //Could also be a propertyFilter QStringList resources; //Could also be a propertyFilter @@ -305,6 +306,7 @@ public: QHash propertyFilter; //Properties to retrieve QSet requestedProperties; + bool syncOnDemand; }; @@ -324,7 +326,7 @@ public: virtual Async::Job create(const DomainType &domainObject) = 0; virtual Async::Job modify(const DomainType &domainObject) = 0; virtual Async::Job remove(const DomainType &domainObject) = 0; - virtual void load(const Query &query, const std::function &resultCallback, const std::function &completeCallback) = 0; + virtual Async::Job load(const Query &query, const std::function &resultCallback) = 0; }; @@ -418,21 +420,25 @@ public: // query tells us in which resources we're interested // TODO: queries to individual resources could be parallelized auto eventloop = QSharedPointer::create(); - int completeCounter = 0; + Async::Job job = Async::null(); for(const QString &resource : query.resources) { auto facade = FacadeFactory::instance().getFacade(resource); //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. std::function addCallback = std::bind(&ResultProvider::add, resultSet, std::placeholders::_1); //We copy the facade pointer to keep it alive - facade->load(query, addCallback, [&completeCounter, &query, resultSet, facade, eventloop]() { - //TODO use jobs instead of this counter - completeCounter++; - if (completeCounter == query.resources.size()) { - resultSet->complete(); - eventloop->quit(); - } + job = job.then([facade, query, addCallback](Async::Future &future) { + Async::Job j = facade->load(query, addCallback); + j.then([&future, facade](Async::Future &f) { + future.setFinished(); + f.setFinished(); + }).exec(); }); } + job.then([eventloop, resultSet](Async::Future &future) { + resultSet->complete(); + eventloop->quit(); + future.setFinished(); + }).exec(); //The thread contains no eventloop, so execute one here eventloop->exec(QEventLoop::ExcludeUserInputEvents); }); diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 339a39c..dda7671 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -106,15 +106,21 @@ Async::Job Pipeline::newEntity(void const *command, size_t size) { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { - qWarning() << "invalid buffer"; + qWarning() << "invalid buffer, not a create entity buffer"; return Async::null(); } } - auto createEntity = Akonadi2::Commands::GetCreateEntity(command); //TODO rename createEntitiy->domainType to bufferType const QString entityType = QString::fromUtf8(reinterpret_cast(createEntity->domainType()->Data()), createEntity->domainType()->size()); + { + flatbuffers::Verifier verifyer(reinterpret_cast(createEntity->delta()->Data()), createEntity->delta()->size()); + if (!Akonadi2::VerifyEntityBuffer(verifyer)) { + qWarning() << "invalid buffer, not an entity buffer"; + return Async::null(); + } + } auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); //Add metadata buffer diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp index 16616a3..c9e4d6d 100644 --- a/common/test/clientapitest.cpp +++ b/common/test/clientapitest.cpp @@ -11,13 +11,15 @@ public: virtual Async::Job create(const Akonadi2::Domain::Event &domainObject){ return Async::null(); }; virtual Async::Job modify(const Akonadi2::Domain::Event &domainObject){ return Async::null(); }; virtual Async::Job remove(const Akonadi2::Domain::Event &domainObject){ return Async::null(); }; - virtual void load(const Akonadi2::Query &query, const std::function &resultCallback, const std::function &completeCallback) + virtual Async::Job load(const Akonadi2::Query &query, const std::function &resultCallback) { - qDebug() << "load called"; - for(const auto &result : results) { - resultCallback(result); - } - completeCallback(); + return Async::start([this, resultCallback](Async::Future &future) { + qDebug() << "load called"; + for(const auto &result : results) { + resultCallback(result); + } + future.setFinished(); + }); } QList results; diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index 13c174b..b7ba2c2 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp @@ -118,22 +118,29 @@ static std::function pre return preparedQuery; } -void DummyResourceFacade::synchronizeResource(const std::function &continuation) +Async::Job DummyResourceFacade::synchronizeResource(bool sync) { //TODO check if a sync is necessary //TODO Only sync what was requested //TODO timeout - mResourceAccess->synchronizeResource().then([continuation](Async::Future &f){ - continuation(); - f.setFinished(); - }).exec(); + + if (sync) { + return Async::start([=](Async::Future &future) { + mResourceAccess->open(); + mResourceAccess->synchronizeResource().then([&future](Async::Future &f) { + future.setFinished(); + f.setFinished(); + }).exec(); + }); + } + return Async::null(); } -void DummyResourceFacade::load(const Akonadi2::Query &query, const std::function &resultCallback, const std::function &completeCallback) +Async::Job DummyResourceFacade::load(const Akonadi2::Query &query, const std::function &resultCallback) { qDebug() << "load called"; - synchronizeResource([=]() { + return synchronizeResource(query.syncOnDemand).then([=](Async::Future &future) { qDebug() << "sync complete"; //Now that the sync is complete we can execute the query const auto preparedQuery = prepareQuery(query); @@ -188,7 +195,7 @@ void DummyResourceFacade::load(const Akonadi2::Query &query, const std::function } return true; }); - completeCallback(); + future.setFinished(); }); } diff --git a/dummyresource/facade.h b/dummyresource/facade.h index 9c8827a..da0b1d6 100644 --- a/dummyresource/facade.h +++ b/dummyresource/facade.h @@ -40,10 +40,10 @@ public: virtual Async::Job create(const Akonadi2::Domain::Event &domainObject); virtual Async::Job modify(const Akonadi2::Domain::Event &domainObject); virtual Async::Job remove(const Akonadi2::Domain::Event &domainObject); - virtual void load(const Akonadi2::Query &query, const std::function &resultCallback, const std::function &completeCallback); + virtual Async::Job load(const Akonadi2::Query &query, const std::function &resultCallback); private: - void synchronizeResource(const std::function &continuation); + Async::Job synchronizeResource(bool sync); QSharedPointer mResourceAccess; QSharedPointer > mFactory; }; diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index f74eb72..dc716ef 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -317,7 +317,9 @@ Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli builder.add_attachment(attachment); auto buffer = builder.Finish(); DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); - enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast(m_fbb.GetBufferPointer()), m_fbb.GetSize())); + flatbuffers::FlatBufferBuilder entityFbb; + Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0); + enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); } else { //modification //TODO diff and create modification if necessary } diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index c469796..a80d22f 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp @@ -82,27 +82,38 @@ private Q_SLOTS: QCOMPARE(revisionSpy.count(), 2); } - // void testResourceSync() - // { - // Akonadi2::Pipeline pipeline("org.kde.dummy"); - // DummyResource resource; - // auto job = resource.synchronizeWithSource(&pipeline); - // auto future = job.exec(); - // QTRY_VERIFY(future.isFinished()); - // } - - // void testSyncAndFacade() - // { - // Akonadi2::Query query; - // query.resources << "org.kde.dummy"; - - // async::SyncListResult result(Akonadi2::Store::load(query)); - // result.exec(); - // QVERIFY(!result.isEmpty()); - // auto value = result.first(); - // QVERIFY(!value->getProperty("summary").toString().isEmpty()); - // qDebug() << value->getProperty("summary").toString(); - // } + void testWriteToFacade() + { + Akonadi2::Query query; + Akonadi2::Domain::Event event; + event.setProperty("summary", "summaryValue"); + Akonadi2::Store::create(event, "org.kde.dummy"); + + QTest::qWait(1000); + //TODO wait for success response + } + + void testResourceSync() + { + Akonadi2::Pipeline pipeline("org.kde.dummy"); + DummyResource resource; + auto job = resource.synchronizeWithSource(&pipeline); + auto future = job.exec(); + QTRY_VERIFY(future.isFinished()); + } + + void testSyncAndFacade() + { + Akonadi2::Query query; + query.resources << "org.kde.dummy"; + + async::SyncListResult result(Akonadi2::Store::load(query)); + result.exec(); + QVERIFY(!result.isEmpty()); + auto value = result.first(); + QVERIFY(!value->getProperty("summary").toString().isEmpty()); + qDebug() << value->getProperty("summary").toString(); + } }; -- cgit v1.2.3