From d80ff84c28c0be626c1df4528741cddf5a55f547 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 21 Dec 2014 22:20:31 +0100 Subject: Write-Read loop from clientside. It's a huge hack but starts to show results. Most urgently we need: * reliable command results * the 3 buffers instead of the 1 * A way to implement storage as preprocessor (or a place to impelement it after the preprocessors). --- common/clientapi.h | 19 +++++++++++++++---- common/pipeline.cpp | 7 ++++++- common/resourceaccess.cpp | 20 ++++++++++++++++++-- common/resourceaccess.h | 1 + common/storage.h | 5 +++++ common/storage_common.cpp | 19 +++++++++++++++++++ common/storage_lmdb.cpp | 16 ++++++++++------ common/test/clientapitest.cpp | 3 ++- 8 files changed, 76 insertions(+), 14 deletions(-) (limited to 'common') diff --git a/common/clientapi.h b/common/clientapi.h index 6054130..d2757e7 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -278,7 +278,7 @@ public: virtual void create(const DomainType &domainObject) = 0; virtual void modify(const DomainType &domainObject) = 0; virtual void remove(const DomainType &domainObject) = 0; - virtual void load(const Query &query, const std::function &resultCallback) = 0; + virtual void load(const Query &query, const std::function &resultCallback, const std::function &completeCallback) = 0; }; @@ -353,7 +353,7 @@ class Store { public: static QString storageLocation() { - return QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2"; + return QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage"; } /** @@ -371,13 +371,24 @@ public: // Query all resources and aggregate results // query tells us in which resources we're interested // TODO: queries to individual resources could be parallelized + auto eventloop = QSharedPointer::create(); + int completeCounter = 0; for(const QString &resource : query.resources) { auto facade = FacadeFactory::instance().getFacade(resource); //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. std::function addCallback = std::bind(&ResultProvider::add, resultSet, std::placeholders::_1); - facade->load(query, addCallback); + //We copy the facade pointer to keep it alive + facade->load(query, addCallback, [&completeCounter, &query, resultSet, facade, eventloop]() { + //TODO use jobs instead of this counter + completeCounter++; + if (completeCounter == query.resources.size()) { + resultSet->complete(); + eventloop->quit(); + } + }); } - resultSet->complete(); + //The thread contains no eventloop, so execute one here + eventloop->exec(QEventLoop::ExcludeUserInputEvents); }); return resultSet->emitter(); } diff --git a/common/pipeline.cpp b/common/pipeline.cpp index cf508c5..739909d 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -31,7 +31,7 @@ class Pipeline::Private { public: Private(const QString &resourceName) - : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName), + : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName, Storage::ReadWrite), stepScheduled(false) { } @@ -71,6 +71,11 @@ void Pipeline::null() void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity) { + const qint64 newRevision = storage().maxRevision() + 1; + //FIXME this should go into a preprocessor + storage().write(key, key.size(), reinterpret_cast(entity.GetBufferPointer()), entity.GetSize()); + storage().setMaxRevision(newRevision); + PipelineState state(this, NewPipeline, key, d->newPipeline); d->activePipelines << state; state.step(); diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index a7e14f5..1706ac4 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -58,7 +58,7 @@ public: void write(QIODevice *device, uint messageId) { - Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); + // Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); } @@ -82,6 +82,7 @@ public: QByteArray partialMessageBuffer; flatbuffers::FlatBufferBuilder fbb; QVector commandQueue; + QVector > synchronizeResultHandler; uint messageId; }; @@ -149,6 +150,13 @@ void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder & } } +void ResourceAccess::synchronizeResource(const std::function &resultHandler) +{ + sendCommand(Commands::SynchronizeCommand); + //TODO: this should be implemented as a job, so we don't need to store the result handler as member + d->synchronizeResultHandler << resultHandler; +} + void ResourceAccess::open() { if (d->socket->isValid()) { @@ -262,6 +270,13 @@ bool ResourceAccess::processMessageBuffer() auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); log(QString("Revision updated to: %1").arg(buffer->revision())); emit revisionChanged(buffer->revision()); + + //FIXME: The result handler should be called on completion of the synchronize command, and not upon arbitrary revision updates. + for(auto handler : d->synchronizeResultHandler) { + //FIXME: we should associate the handler with a buffer->id() to avoid prematurely triggering the result handler from a delayed synchronized response (this is relevant for on-demand syncing). + handler(); + } + d->synchronizeResultHandler.clear(); break; } case Commands::CommandCompletion: { @@ -280,7 +295,8 @@ bool ResourceAccess::processMessageBuffer() void ResourceAccess::log(const QString &message) { - Console::main()->log(d->resourceName + ": " + message); + qDebug() << d->resourceName + ": " + message; + // Console::main()->log(d->resourceName + ": " + message); } } diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 3a35af6..7416b25 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -42,6 +42,7 @@ public: void sendCommand(int commandId); void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); + void synchronizeResource(const std::function &resultHandler); public Q_SLOTS: void open(); diff --git a/common/storage.h b/common/storage.h index 57ee56c..6cfa3d6 100644 --- a/common/storage.h +++ b/common/storage.h @@ -71,6 +71,11 @@ public: static std::function basicErrorHandler(); qint64 diskUsage() const; void removeFromDisk() const; + + qint64 maxRevision(); + void setMaxRevision(qint64 revision); + + bool exists() const; private: class Private; Private * const d; diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 8f465fc..bdae9dd 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp @@ -52,4 +52,23 @@ void Storage::scan(const std::string &sKey, const std::function bool { + r = QString::fromStdString(revision).toLongLong(); + return false; + }, + [](const Storage::Error &error) { + //Ignore the error in case we don't find the value + //TODO only ignore value not found errors + }); + return r; +} + } // namespace Akonadi2 diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index a8ec378..eeb0045 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -59,6 +59,7 @@ QMutex Storage::Private::sMutex; Storage::Private::Private(const QString &s, const QString &n, AccessMode m) : storageRoot(s), name(n), + env(0), transaction(0), mode(m), readTransaction(false), @@ -66,7 +67,7 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m) { const QString fullPath(storageRoot + '/' + name); QDir dir; - dir.mkdir(storageRoot); + dir.mkpath(storageRoot); dir.mkdir(fullPath); //This seems to resolve threading related issues, not sure why though @@ -97,8 +98,10 @@ Storage::Private::~Private() } // it is still there and still unused, so we can shut it down - mdb_dbi_close(env, dbi); - mdb_env_close(env); + if (env) { + mdb_dbi_close(env, dbi); + mdb_env_close(env); + } } Storage::Storage(const QString &storageRoot, const QString &name, AccessMode mode) @@ -111,6 +114,10 @@ Storage::~Storage() delete d; } +bool Storage::exists() const +{ + return (d->env != 0); +} bool Storage::isInTransaction() const { return d->transaction; @@ -313,12 +320,9 @@ void Storage::scan(const char *keyData, uint keySize, errorHandler(error); } - /** - we don't abort the transaction since we need it for reading the values if (implicitTransaction) { abortTransaction(); } - */ } qint64 Storage::diskUsage() const diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp index 2d1c238..dd634f1 100644 --- a/common/test/clientapitest.cpp +++ b/common/test/clientapitest.cpp @@ -11,12 +11,13 @@ public: virtual void create(const Akonadi2::Domain::Event &domainObject){}; virtual void modify(const Akonadi2::Domain::Event &domainObject){}; virtual void remove(const Akonadi2::Domain::Event &domainObject){}; - virtual void load(const Akonadi2::Query &query, const std::function &resultCallback) + virtual void load(const Akonadi2::Query &query, const std::function &resultCallback, const std::function &completeCallback) { qDebug() << "load called"; for(const auto &result : results) { resultCallback(result); } + completeCallback(); } QList results; -- cgit v1.2.3