From d80ff84c28c0be626c1df4528741cddf5a55f547 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 21 Dec 2014 22:20:31 +0100 Subject: Write-Read loop from clientside. It's a huge hack but starts to show results. Most urgently we need: * reliable command results * the 3 buffers instead of the 1 * A way to implement storage as preprocessor (or a place to impelement it after the preprocessors). --- common/clientapi.h | 19 +++++++-- common/pipeline.cpp | 7 +++- common/resourceaccess.cpp | 20 ++++++++- common/resourceaccess.h | 1 + common/storage.h | 5 +++ common/storage_common.cpp | 19 +++++++++ common/storage_lmdb.cpp | 16 ++++--- common/test/clientapitest.cpp | 3 +- dummyresource/dummycalendar.fbs | 1 + dummyresource/facade.cpp | 63 +++++++++++++++++++--------- dummyresource/facade.h | 9 ++-- dummyresource/resourcefactory.cpp | 87 +++++++++++++++++++++++++++++++++++---- tests/CMakeLists.txt | 2 + tests/dummyresourcetest.cpp | 35 ++++++++++++++++ 14 files changed, 244 insertions(+), 43 deletions(-) create mode 100644 tests/dummyresourcetest.cpp diff --git a/common/clientapi.h b/common/clientapi.h index 6054130..d2757e7 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -278,7 +278,7 @@ public: virtual void create(const DomainType &domainObject) = 0; virtual void modify(const DomainType &domainObject) = 0; virtual void remove(const DomainType &domainObject) = 0; - virtual void load(const Query &query, const std::function &resultCallback) = 0; + virtual void load(const Query &query, const std::function &resultCallback, const std::function &completeCallback) = 0; }; @@ -353,7 +353,7 @@ class Store { public: static QString storageLocation() { - return QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2"; + return QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage"; } /** @@ -371,13 +371,24 @@ public: // Query all resources and aggregate results // query tells us in which resources we're interested // TODO: queries to individual resources could be parallelized + auto eventloop = QSharedPointer::create(); + int completeCounter = 0; for(const QString &resource : query.resources) { auto facade = FacadeFactory::instance().getFacade(resource); //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. std::function addCallback = std::bind(&ResultProvider::add, resultSet, std::placeholders::_1); - facade->load(query, addCallback); + //We copy the facade pointer to keep it alive + facade->load(query, addCallback, [&completeCounter, &query, resultSet, facade, eventloop]() { + //TODO use jobs instead of this counter + completeCounter++; + if (completeCounter == query.resources.size()) { + resultSet->complete(); + eventloop->quit(); + } + }); } - resultSet->complete(); + //The thread contains no eventloop, so execute one here + eventloop->exec(QEventLoop::ExcludeUserInputEvents); }); return resultSet->emitter(); } diff --git a/common/pipeline.cpp b/common/pipeline.cpp index cf508c5..739909d 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -31,7 +31,7 @@ class Pipeline::Private { public: Private(const QString &resourceName) - : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName), + : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName, Storage::ReadWrite), stepScheduled(false) { } @@ -71,6 +71,11 @@ void Pipeline::null() void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity) { + const qint64 newRevision = storage().maxRevision() + 1; + //FIXME this should go into a preprocessor + storage().write(key, key.size(), reinterpret_cast(entity.GetBufferPointer()), entity.GetSize()); + storage().setMaxRevision(newRevision); + PipelineState state(this, NewPipeline, key, d->newPipeline); d->activePipelines << state; state.step(); diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index a7e14f5..1706ac4 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -58,7 +58,7 @@ public: void write(QIODevice *device, uint messageId) { - Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); + // Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); } @@ -82,6 +82,7 @@ public: QByteArray partialMessageBuffer; flatbuffers::FlatBufferBuilder fbb; QVector commandQueue; + QVector > synchronizeResultHandler; uint messageId; }; @@ -149,6 +150,13 @@ void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder & } } +void ResourceAccess::synchronizeResource(const std::function &resultHandler) +{ + sendCommand(Commands::SynchronizeCommand); + //TODO: this should be implemented as a job, so we don't need to store the result handler as member + d->synchronizeResultHandler << resultHandler; +} + void ResourceAccess::open() { if (d->socket->isValid()) { @@ -262,6 +270,13 @@ bool ResourceAccess::processMessageBuffer() auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); log(QString("Revision updated to: %1").arg(buffer->revision())); emit revisionChanged(buffer->revision()); + + //FIXME: The result handler should be called on completion of the synchronize command, and not upon arbitrary revision updates. + for(auto handler : d->synchronizeResultHandler) { + //FIXME: we should associate the handler with a buffer->id() to avoid prematurely triggering the result handler from a delayed synchronized response (this is relevant for on-demand syncing). + handler(); + } + d->synchronizeResultHandler.clear(); break; } case Commands::CommandCompletion: { @@ -280,7 +295,8 @@ bool ResourceAccess::processMessageBuffer() void ResourceAccess::log(const QString &message) { - Console::main()->log(d->resourceName + ": " + message); + qDebug() << d->resourceName + ": " + message; + // Console::main()->log(d->resourceName + ": " + message); } } diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 3a35af6..7416b25 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -42,6 +42,7 @@ public: void sendCommand(int commandId); void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); + void synchronizeResource(const std::function &resultHandler); public Q_SLOTS: void open(); diff --git a/common/storage.h b/common/storage.h index 57ee56c..6cfa3d6 100644 --- a/common/storage.h +++ b/common/storage.h @@ -71,6 +71,11 @@ public: static std::function basicErrorHandler(); qint64 diskUsage() const; void removeFromDisk() const; + + qint64 maxRevision(); + void setMaxRevision(qint64 revision); + + bool exists() const; private: class Private; Private * const d; diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 8f465fc..bdae9dd 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp @@ -52,4 +52,23 @@ void Storage::scan(const std::string &sKey, const std::function bool { + r = QString::fromStdString(revision).toLongLong(); + return false; + }, + [](const Storage::Error &error) { + //Ignore the error in case we don't find the value + //TODO only ignore value not found errors + }); + return r; +} + } // namespace Akonadi2 diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index a8ec378..eeb0045 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -59,6 +59,7 @@ QMutex Storage::Private::sMutex; Storage::Private::Private(const QString &s, const QString &n, AccessMode m) : storageRoot(s), name(n), + env(0), transaction(0), mode(m), readTransaction(false), @@ -66,7 +67,7 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m) { const QString fullPath(storageRoot + '/' + name); QDir dir; - dir.mkdir(storageRoot); + dir.mkpath(storageRoot); dir.mkdir(fullPath); //This seems to resolve threading related issues, not sure why though @@ -97,8 +98,10 @@ Storage::Private::~Private() } // it is still there and still unused, so we can shut it down - mdb_dbi_close(env, dbi); - mdb_env_close(env); + if (env) { + mdb_dbi_close(env, dbi); + mdb_env_close(env); + } } Storage::Storage(const QString &storageRoot, const QString &name, AccessMode mode) @@ -111,6 +114,10 @@ Storage::~Storage() delete d; } +bool Storage::exists() const +{ + return (d->env != 0); +} bool Storage::isInTransaction() const { return d->transaction; @@ -313,12 +320,9 @@ void Storage::scan(const char *keyData, uint keySize, errorHandler(error); } - /** - we don't abort the transaction since we need it for reading the values if (implicitTransaction) { abortTransaction(); } - */ } qint64 Storage::diskUsage() const diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp index 2d1c238..dd634f1 100644 --- a/common/test/clientapitest.cpp +++ b/common/test/clientapitest.cpp @@ -11,12 +11,13 @@ public: virtual void create(const Akonadi2::Domain::Event &domainObject){}; virtual void modify(const Akonadi2::Domain::Event &domainObject){}; virtual void remove(const Akonadi2::Domain::Event &domainObject){}; - virtual void load(const Akonadi2::Query &query, const std::function &resultCallback) + virtual void load(const Akonadi2::Query &query, const std::function &resultCallback, const std::function &completeCallback) { qDebug() << "load called"; for(const auto &result : results) { resultCallback(result); } + completeCallback(); } QList results; diff --git a/dummyresource/dummycalendar.fbs b/dummyresource/dummycalendar.fbs index 5a217b5..643c9b2 100644 --- a/dummyresource/dummycalendar.fbs +++ b/dummyresource/dummycalendar.fbs @@ -6,6 +6,7 @@ table DummyEvent { summary:string; description:string; attachment:[ubyte]; + remoteId:string; } root_type DummyEvent; diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index 0d47010..c2871bb 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp @@ -23,6 +23,7 @@ #include #include "common/resourceaccess.h" +#include "common/commands.h" #include "dummycalendar_generated.h" using namespace DummyCalendar; @@ -30,7 +31,7 @@ using namespace flatbuffers; DummyResourceFacade::DummyResourceFacade() : Akonadi2::StoreFacade(), - mResourceAccess(/* new ResourceAccess("dummyresource") */) + mResourceAccess(new Akonadi2::ResourceAccess("org.kde.dummy")) { // connect(mResourceAccess.data(), &ResourceAccess::ready, this, onReadyChanged); } @@ -95,11 +96,8 @@ public: QSharedPointer storage; }; -void DummyResourceFacade::load(const Akonadi2::Query &query, const std::function &resultCallback) +static std::function prepareQuery(const Akonadi2::Query &query) { - qDebug() << "load called"; - auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "dummyresource"); - //Compose some functions to make query matching fast. //This way we can process the query once, and convert all values into something that can be compared quickly std::function preparedQuery; @@ -125,21 +123,48 @@ void DummyResourceFacade::load(const Akonadi2::Query &query, const std::function return true; }; } + return preparedQuery; +} - //Because we have no indexes yet, we always do a full scan - storage->scan("", [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { - //TODO read second buffer as well - auto eventBuffer = GetDummyEvent(dataValue); - if (preparedQuery && preparedQuery(std::string(static_cast(keyValue), keySize), eventBuffer)) { - //TODO read the revision from the generic portion of the buffer - auto event = QSharedPointer::create("dummyresource", QString::fromUtf8(static_cast(keyValue), keySize), 0); - event->buffer = eventBuffer; - event->storage = storage; - resultCallback(event); - } - return true; +void DummyResourceFacade::synchronizeResource(const std::function &continuation) +{ + //TODO check if a sync is necessary + //TODO Only sync what was requested + //TODO timeout + mResourceAccess->open(); + mResourceAccess->synchronizeResource(continuation); +} + +void DummyResourceFacade::load(const Akonadi2::Query &query, const std::function &resultCallback, const std::function &completeCallback) +{ + qDebug() << "load called"; + + synchronizeResource([=]() { + //Now that the sync is complete we can execute the query + const auto preparedQuery = prepareQuery(query); + + auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); + + qDebug() << "executing query"; + //We start a transaction explicitly that we'll leave open so the values can be read. + //The transaction will be closed automatically once the storage object is destroyed. + storage->startTransaction(Akonadi2::Storage::ReadOnly); + //Because we have no indexes yet, we always do a full scan + storage->scan("", [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { + //TODO read the three buffers + qDebug() << QString::fromStdString(std::string(static_cast(keyValue), keySize)); + auto eventBuffer = GetDummyEvent(dataValue); + if (preparedQuery && preparedQuery(std::string(static_cast(keyValue), keySize), eventBuffer)) { + //TODO set proper revision + qint64 revision = 0; + auto event = QSharedPointer::create("org.kde.dummy", QString::fromUtf8(static_cast(keyValue), keySize), revision); + event->buffer = eventBuffer; + event->storage = storage; + resultCallback(event); + } + return true; + }); + completeCallback(); }); } -//TODO call in plugin loader -// Akonadi2::FacadeFactory::instance().registerFacade("dummyresource", [facade](){ return new DummyResourceFacade(facade); }); diff --git a/dummyresource/facade.h b/dummyresource/facade.h index f179c06..c76e62c 100644 --- a/dummyresource/facade.h +++ b/dummyresource/facade.h @@ -22,7 +22,9 @@ #include "common/clientapi.h" #include "common/storage.h" -class ResourceAccess; +namespace Akonadi2 { + class ResourceAccess; +} class DummyResourceFacade : public Akonadi2::StoreFacade { @@ -32,8 +34,9 @@ public: virtual void create(const Akonadi2::Domain::Event &domainObject); virtual void modify(const Akonadi2::Domain::Event &domainObject); virtual void remove(const Akonadi2::Domain::Event &domainObject); - virtual void load(const Akonadi2::Query &query, const std::function &resultCallback); + virtual void load(const Akonadi2::Query &query, const std::function &resultCallback, const std::function &completeCallback); private: - QSharedPointer mResourceAccess; + void synchronizeResource(const std::function &continuation); + QSharedPointer mResourceAccess; }; diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index bd85b4f..2c43981 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -20,22 +20,95 @@ #include "resourcefactory.h" #include "facade.h" #include "dummycalendar_generated.h" +#include + +static std::string createEvent() +{ + static const size_t attachmentSize = 1024*2; // 2KB + static uint8_t rawData[attachmentSize]; + static flatbuffers::FlatBufferBuilder fbb; + fbb.Clear(); + { + auto summary = fbb.CreateString("summary"); + auto data = fbb.CreateUninitializedVector(attachmentSize); + DummyCalendar::DummyEventBuilder eventBuilder(fbb); + eventBuilder.add_summary(summary); + eventBuilder.add_attachment(data); + auto eventLocation = eventBuilder.Finish(); + DummyCalendar::FinishDummyEventBuffer(fbb, eventLocation); + memcpy((void*)DummyCalendar::GetDummyEvent(fbb.GetBufferPointer())->attachment()->Data(), rawData, attachmentSize); + } + + return std::string(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); +} + +QMap populate() +{ + QMap content; + for (int i = 0; i < 2; i++) { + auto event = createEvent(); + content.insert(QString("key%1").arg(i), QString::fromStdString(event)); + } + return content; +} + +static QMap s_dataSource = populate(); DummyResource::DummyResource() : Akonadi2::Resource() { +} +void findByRemoteId(QSharedPointer storage, const QString &rid, std::function callback) +{ + //TODO lookup in rid index instead of doing a full scan + const std::string ridString = rid.toStdString(); + storage->scan("", [&](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { + auto eventBuffer = DummyCalendar::GetDummyEvent(dataValue); + if (std::string(eventBuffer->remoteId()->c_str(), eventBuffer->remoteId()->size()) == ridString) { + callback(keyValue, keySize, dataValue, dataSize); + } + return true; + }); } void DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) { - // TODO actually populate the storage with new items - auto builder = DummyCalendar::DummyEventBuilder(m_fbb); - builder .add_summary(m_fbb.CreateString("summary summary!")); - auto buffer = builder.Finish(); - DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); - pipeline->newEntity("fakekey", m_fbb); - m_fbb.Clear(); + //TODO use a read-only transaction during the complete sync to sync against a defined revision + + qDebug() << "synchronize with source"; + + auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); + for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) { + bool isNew = true; + if (storage->exists()) { + findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) { + isNew = false; + }); + } + + if (isNew) { + //TODO: perhaps it would be more convenient to populate the domain types? + //Resource specific parts are not accessible that way, but then we would only have to implement the property mapping in one place + const QByteArray data = it.value().toUtf8(); + auto eventBuffer = DummyCalendar::GetDummyEvent(data.data()); + + //Map the source format to the buffer format (which happens to be an exact copy here) + auto builder = DummyCalendar::DummyEventBuilder(m_fbb); + builder.add_summary(m_fbb.CreateString(eventBuffer->summary()->c_str())); + auto buffer = builder.Finish(); + DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); + + //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. + const auto key = QUuid::createUuid().toString().toUtf8(); + //TODO can we really just start populating the buffer and pass the buffer builder? + qDebug() << "new event"; + pipeline->newEntity(key, m_fbb); + } else { //modification + //TODO diff and create modification if necessary + } + } + //TODO find items to remove } void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4c288e9..dcf2f21 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -17,7 +17,9 @@ manual_tests ( storagebenchmark storagetest dummyresourcefacadetest + dummyresourcetest ) target_link_libraries(dummyresourcefacadetest akonadi2_resource_dummy) +target_link_libraries(dummyresourcetest akonadi2_resource_dummy) diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp new file mode 100644 index 0000000..75d29de --- /dev/null +++ b/tests/dummyresourcetest.cpp @@ -0,0 +1,35 @@ +#include + +#include + +#include "common/resource.h" +#include "clientapi.h" + +class DummyResourceTest : public QObject +{ + Q_OBJECT +private Q_SLOTS: + void initTestCase() + { + auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy"); + QVERIFY(factory); + } + + void cleanupTestCase() + { + } + + void testSync() + { + Akonadi2::Query query; + query.resources << "org.kde.dummy"; + + async::SyncListResult result(Akonadi2::Store::load(query)); + result.exec(); + QVERIFY(!result.isEmpty()); + } + +}; + +QTEST_MAIN(DummyResourceTest) +#include "dummyresourcetest.moc" -- cgit v1.2.3