From 9b2257d680a5e4fa2fda8cf3302f25054a06710e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 28 Dec 2014 14:44:50 +0100 Subject: Buffers wrapped into entity buffer, async command progress tracking. --- async/src/CMakeLists.txt | 7 +- common/CMakeLists.txt | 7 +- common/entity.fbs | 9 +++ common/entitybuffer.cpp | 53 +++++++++++++ common/entitybuffer.fbs | 9 --- common/entitybuffer.h | 22 ++++++ common/pipeline.cpp | 29 +++---- common/pipeline.h | 1 + common/resource.cpp | 6 +- common/resource.h | 3 +- common/resourceaccess.cpp | 96 +++++++++++++---------- common/resourceaccess.h | 8 +- dummyresource/facade.cpp | 48 ++++++++++-- dummyresource/resourcefactory.cpp | 94 +++++++++++++---------- dummyresource/resourcefactory.h | 3 +- synchronizer/listener.cpp | 155 ++++++++++++++++++++------------------ synchronizer/listener.h | 6 +- tests/dummyresourcefacadetest.cpp | 15 +--- tests/dummyresourcetest.cpp | 15 +++- 19 files changed, 378 insertions(+), 208 deletions(-) create mode 100644 common/entity.fbs create mode 100644 common/entitybuffer.cpp delete mode 100644 common/entitybuffer.fbs create mode 100644 common/entitybuffer.h diff --git a/async/src/CMakeLists.txt b/async/src/CMakeLists.txt index a371da0..85700ed 100644 --- a/async/src/CMakeLists.txt +++ b/async/src/CMakeLists.txt @@ -1,3 +1,5 @@ +project(akonadi2async) + include_directories(${CMAKE_CURRENT_BINARY_DIR}) set(async_SRCS @@ -5,5 +7,6 @@ set(async_SRCS future.cpp ) -add_library(akonadi2async SHARED ${async_SRCS}) -target_link_libraries(akonadi2async Qt5::Core) +add_library(${PROJECT_NAME} SHARED ${async_SRCS}) +target_link_libraries(${PROJECT_NAME} Qt5::Core) +install(TARGETS ${PROJECT_NAME} DESTINATION lib) diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index ec13e07..1a9a812 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -8,7 +8,7 @@ generate_flatbuffers( commands/modifyentity commands/revisionupdate domain/event - entitybuffer + entity metadata ) @@ -17,10 +17,11 @@ if (STORAGE_unqlite) set(storage_SRCS unqlite/unqlite.c storage_unqlite.cpp) else (STORAGE_unqlite) set(storage_SRCS storage_lmdb.cpp) - set(storage_LIBS lmdb) + set(storage_LIBS ${lmdb}) endif (STORAGE_unqlite) set(command_SRCS + entitybuffer.cpp clientapi.cpp commands.cpp console.cpp @@ -35,7 +36,7 @@ add_library(${PROJECT_NAME} SHARED ${command_SRCS}) generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) qt5_use_modules(${PROJECT_NAME} Widgets Network) -target_link_libraries(${PROJECT_NAME} ${storage_LIBS}) +target_link_libraries(${PROJECT_NAME} ${storage_LIBS} akonadi2async) install(TARGETS ${PROJECT_NAME} DESTINATION lib) add_subdirectory(test) diff --git a/common/entity.fbs b/common/entity.fbs new file mode 100644 index 0000000..565b1a7 --- /dev/null +++ b/common/entity.fbs @@ -0,0 +1,9 @@ +namespace Akonadi2; + +table Entity { + metadata: [ubyte]; + resource: [ubyte]; + local: [ubyte]; +} + +root_type Entity; diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp new file mode 100644 index 0000000..a78e91d --- /dev/null +++ b/common/entitybuffer.cpp @@ -0,0 +1,53 @@ +#include "entitybuffer.h" + +#include "entity_generated.h" +#include "metadata_generated.h" +#include + +using namespace Akonadi2; + +EntityBuffer::EntityBuffer(void *dataValue, int dataSize) + : mEntity(nullptr) +{ + flatbuffers::Verifier verifyer(reinterpret_cast(dataValue), dataSize); + // Q_ASSERT(Akonadi2::VerifyEntity(verifyer)); + if (!Akonadi2::VerifyEntityBuffer(verifyer)) { + qWarning() << "invalid buffer"; + } else { + mEntity = Akonadi2::GetEntity(dataValue); + } +} + +const flatbuffers::Vector* EntityBuffer::resourceBuffer() +{ + if (!mEntity) { + qDebug() << "no buffer"; + return nullptr; + } + return mEntity->resource(); +} + +const flatbuffers::Vector* EntityBuffer::metadataBuffer() +{ + if (!mEntity) { + return nullptr; + } + return mEntity->metadata(); +} + +const flatbuffers::Vector* EntityBuffer::localBuffer() +{ + if (!mEntity) { + return nullptr; + } + return mEntity->local(); +} + +void EntityBuffer::extractResourceBuffer(void *dataValue, int dataSize, const std::function *)> &handler) +{ + Akonadi2::EntityBuffer buffer(dataValue, dataSize); + if (auto resourceData = buffer.resourceBuffer()) { + handler(resourceData); + } +} + diff --git a/common/entitybuffer.fbs b/common/entitybuffer.fbs deleted file mode 100644 index 28c9b2a..0000000 --- a/common/entitybuffer.fbs +++ /dev/null @@ -1,9 +0,0 @@ -namespace Akonadi2; - -table EntityBuffer { - metadata: [ubyte]; - resource: [ubyte]; - local: [ubyte]; -} - -root_type EntityBuffer; diff --git a/common/entitybuffer.h b/common/entitybuffer.h new file mode 100644 index 0000000..2a7150e --- /dev/null +++ b/common/entitybuffer.h @@ -0,0 +1,22 @@ +#pragma once + +#include +#include + +namespace Akonadi2 { +class Entity; + +class EntityBuffer { +public: + EntityBuffer(void *dataValue, int size); + const flatbuffers::Vector *resourceBuffer(); + const flatbuffers::Vector *metadataBuffer(); + const flatbuffers::Vector *localBuffer(); + + static void extractResourceBuffer(void *dataValue, int dataSize, const std::function *)> &handler); +private: + const Entity *mEntity; +}; + +} + diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 5ca8b95..dc6d389 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -24,7 +24,7 @@ #include #include #include -#include "entitybuffer_generated.h" +#include "entity_generated.h" #include "metadata_generated.h" namespace Akonadi2 @@ -76,33 +76,31 @@ void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t { const qint64 newRevision = storage().maxRevision() + 1; - flatbuffers::FlatBufferBuilder fbb; - auto builder = Akonadi2::EntityBufferBuilder(fbb); + std::vector metadataData; //Add metadata buffer { flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); auto metadataBuffer = metadataBuilder.Finish(); - Akonadi2::FinishMetadataBuffer(fbb, metadataBuffer); - //TODO use memcpy - auto metadata = fbb.CreateVector(metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); - builder.add_metadata(metadata); + Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); + metadataData.resize(metadataFbb.GetSize()); + std::copy_n(metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), back_inserter(metadataData)); } - //Add resource buffer - { - //TODO use memcpy - auto resource = fbb.CreateVector(static_cast(resourceBufferData), size); - builder.add_resource(resource); - } + flatbuffers::FlatBufferBuilder fbb; + auto metadata = fbb.CreateVector(metadataData.data(), metadataData.size()); + auto resource = fbb.CreateVector(static_cast(resourceBufferData), size); + auto builder = Akonadi2::EntityBuilder(fbb); + builder.add_metadata(metadata); + builder.add_resource(resource); //We don't have a local buffer yet // builder.add_local(); auto buffer = builder.Finish(); - Akonadi2::FinishEntityBufferBuffer(fbb, buffer); + Akonadi2::FinishEntityBuffer(fbb, buffer); qDebug() << "writing new entity" << key; storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); @@ -163,6 +161,9 @@ void Pipeline::pipelineCompleted(const PipelineState &state) emit revisionUpdated(); } scheduleStep(); + if (d->activePipelines.isEmpty()) { + emit pipelinesDrained(); + } } diff --git a/common/pipeline.h b/common/pipeline.h index 159cc1c..6ef8703 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -55,6 +55,7 @@ public: Q_SIGNALS: void revisionUpdated(); + void pipelinesDrained(); private Q_SLOTS: void stepPipelines(); diff --git a/common/resource.cpp b/common/resource.cpp index ae28485..bba6609 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -48,9 +48,11 @@ void Resource::processCommand(int commandId, const QByteArray &data, uint size, pipeline->null(); } -void Resource::synchronizeWithSource(Pipeline *pipeline) +Async::Job Resource::synchronizeWithSource(Pipeline *pipeline) { - pipeline->null(); + return Async::start([pipeline](Async::Future &f) { + pipeline->null(); + }); } class ResourceFactory::Private diff --git a/common/resource.h b/common/resource.h index 0f65e1f..fb42c1b 100644 --- a/common/resource.h +++ b/common/resource.h @@ -21,6 +21,7 @@ #include #include #include +#include namespace Akonadi2 { @@ -33,7 +34,7 @@ public: virtual ~Resource(); virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); - virtual void synchronizeWithSource(Pipeline *pipeline); + virtual Async::Job synchronizeWithSource(Pipeline *pipeline); private: class Private; diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 1706ac4..31b9e79 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -37,38 +37,36 @@ namespace Akonadi2 class QueuedCommand { public: - QueuedCommand(int commandId) - : m_commandId(commandId), - m_bufferSize(0), - m_buffer(0) + QueuedCommand(int commandId, const std::function &callback) + : commandId(commandId), + bufferSize(0), + buffer(0), + callback(callback) {} - QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) - : m_commandId(commandId), - m_bufferSize(fbb.GetSize()), - m_buffer(new char[m_bufferSize]) + QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback) + : commandId(commandId), + bufferSize(fbb.GetSize()), + buffer(new char[bufferSize]), + callback(callback) { - memcpy(m_buffer, fbb.GetBufferPointer(), m_bufferSize); + memcpy(buffer, fbb.GetBufferPointer(), bufferSize); } ~QueuedCommand() { - delete[] m_buffer; - } - - void write(QIODevice *device, uint messageId) - { - // Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); - Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); + delete[] buffer; } private: QueuedCommand(const QueuedCommand &other); QueuedCommand &operator=(const QueuedCommand &rhs); - const int m_commandId; - const uint m_bufferSize; - char *m_buffer; +public: + const int commandId; + const uint bufferSize; + char *buffer; + std::function callback; }; class ResourceAccess::Private @@ -82,7 +80,7 @@ public: QByteArray partialMessageBuffer; flatbuffers::FlatBufferBuilder fbb; QVector commandQueue; - QVector > synchronizeResultHandler; + QMultiMap > resultHandler; uint messageId; }; @@ -130,31 +128,42 @@ bool ResourceAccess::isReady() const return d->socket->isValid(); } -void ResourceAccess::sendCommand(int commandId) +void ResourceAccess::registerCallback(uint messageId, const std::function &callback) +{ + d->resultHandler.insert(messageId, callback); +} + +void ResourceAccess::sendCommand(int commandId, const std::function &callback) { if (isReady()) { log(QString("Sending command %1").arg(commandId)); - Commands::write(d->socket, ++d->messageId, commandId); + d->messageId++; + if (callback) { + registerCallback(d->messageId, callback); + } + Commands::write(d->socket, d->messageId, commandId); } else { - d->commandQueue << new QueuedCommand(commandId); + d->commandQueue << new QueuedCommand(commandId, callback); } } -void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) +void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback) { if (isReady()) { log(QString("Sending command %1").arg(commandId)); - Commands::write(d->socket, ++d->messageId, commandId, fbb); + d->messageId++; + if (callback) { + registerCallback(d->messageId, callback); + } + Commands::write(d->socket, d->messageId, commandId, fbb); } else { - d->commandQueue << new QueuedCommand(commandId, fbb); + d->commandQueue << new QueuedCommand(commandId, fbb, callback); } } void ResourceAccess::synchronizeResource(const std::function &resultHandler) { - sendCommand(Commands::SynchronizeCommand); - //TODO: this should be implemented as a job, so we don't need to store the result handler as member - d->synchronizeResultHandler << resultHandler; + sendCommand(Commands::SynchronizeCommand, resultHandler); } void ResourceAccess::open() @@ -200,7 +209,12 @@ void ResourceAccess::connected() //TODO: serialize instead of blast them all through the socket? log(QString("We have %1 queued commands").arg(d->commandQueue.size())); for (QueuedCommand *command: d->commandQueue) { - command->write(d->socket, ++d->messageId); + d->messageId++; + log(QString("Sending command %1").arg(command->commandId)); + if (command->callback) { + registerCallback(d->messageId, command->callback); + } + Commands::write(d->socket, d->messageId, command->commandId, command->buffer, command->bufferSize); delete command; } d->commandQueue.clear(); @@ -234,6 +248,8 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) if (!d->tryOpenTimer->isActive()) { d->tryOpenTimer->start(); } + } else { + qWarning() << "Failed to start resource"; } } @@ -256,8 +272,7 @@ bool ResourceAccess::processMessageBuffer() return false; } - //messageId is unused, so commented out - //const uint messageId = *(int*)(d->partialMessageBuffer.constData()); + const uint messageId = *(int*)(d->partialMessageBuffer.constData()); const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); @@ -271,18 +286,15 @@ bool ResourceAccess::processMessageBuffer() log(QString("Revision updated to: %1").arg(buffer->revision())); emit revisionChanged(buffer->revision()); - //FIXME: The result handler should be called on completion of the synchronize command, and not upon arbitrary revision updates. - for(auto handler : d->synchronizeResultHandler) { - //FIXME: we should associate the handler with a buffer->id() to avoid prematurely triggering the result handler from a delayed synchronized response (this is relevant for on-demand syncing). - handler(); - } - d->synchronizeResultHandler.clear(); break; } case Commands::CommandCompletion: { auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc + + //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first + QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id())); break; } default: @@ -293,6 +305,14 @@ bool ResourceAccess::processMessageBuffer() return d->partialMessageBuffer.size() >= headerSize; } +void ResourceAccess::callCallbacks(int id) +{ + for(auto handler : d->resultHandler.values(id)) { + handler(); + } + d->resultHandler.remove(id); +} + void ResourceAccess::log(const QString &message) { qDebug() << d->resourceName + ": " + message; diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 7416b25..d79c993 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -40,8 +40,9 @@ public: QString resourceName() const; bool isReady() const; - void sendCommand(int commandId); - void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); + //TODO use jobs + void sendCommand(int commandId, const std::function &callback = std::function()); + void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback); void synchronizeResource(const std::function &resultHandler); public Q_SLOTS: @@ -51,6 +52,7 @@ public Q_SLOTS: Q_SIGNALS: void ready(bool isReady); void revisionChanged(unsigned long long revision); + void commandCompleted(); private Q_SLOTS: //TODO: move these to the Private class @@ -59,9 +61,11 @@ private Q_SLOTS: void connectionError(QLocalSocket::LocalSocketError error); void readResourceMessage(); bool processMessageBuffer(); + void callCallbacks(int id); private: void log(const QString &message); + void registerCallback(uint messageId, const std::function &callback); class Private; Private * const d; diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index 458aba6..d3974e9 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp @@ -26,8 +26,9 @@ #include "common/commands.h" #include "dummycalendar_generated.h" #include "event_generated.h" -#include "entitybuffer_generated.h" +#include "entity_generated.h" #include "metadata_generated.h" +#include using namespace DummyCalendar; using namespace flatbuffers; @@ -199,14 +200,47 @@ void DummyResourceFacade::load(const Akonadi2::Query &query, const std::function storage->startTransaction(Akonadi2::Storage::ReadOnly); //Because we have no indexes yet, we always do a full scan storage->scan("", [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { - qDebug() << QString::fromStdString(std::string(static_cast(keyValue), keySize)); - auto buffer = Akonadi2::GetEntityBuffer(dataValue); - auto resourceBuffer = GetDummyEvent(buffer->resource()); - auto metadataBuffer = Akonadi2::GetMetadata(buffer->resource()); - auto localBuffer = Akonadi2::Domain::Buffer::GetEvent(buffer->local()); + + //Skip internals + if (QByteArray::fromRawData(static_cast(keyValue), keySize).startsWith("__internal")) { + return true; + } + + //Extract buffers + Akonadi2::EntityBuffer buffer(dataValue, dataSize); + + DummyEvent const *resourceBuffer = 0; + if (auto resourceData = buffer.resourceBuffer()) { + flatbuffers::Verifier verifyer(resourceData->Data(), resourceData->size()); + if (VerifyDummyEventBuffer(verifyer)) { + resourceBuffer = GetDummyEvent(resourceData); + } + } + + Akonadi2::Metadata const *metadataBuffer = 0; + if (auto metadataData = buffer.metadataBuffer()) { + flatbuffers::Verifier verifyer(metadataData->Data(), metadataData->size()); + if (Akonadi2::VerifyMetadataBuffer(verifyer)) { + metadataBuffer = Akonadi2::GetMetadata(metadataData); + } + } + + Akonadi2::Domain::Buffer::Event const *localBuffer = 0; + if (auto localData = buffer.localBuffer()) { + flatbuffers::Verifier verifyer(localData->Data(), localData->size()); + if (Akonadi2::Domain::Buffer::VerifyEventBuffer(verifyer)) { + localBuffer = Akonadi2::Domain::Buffer::GetEvent(localData); + } + } + + if (!resourceBuffer || !metadataBuffer) { + qWarning() << "invalid buffer " << QString::fromStdString(std::string(static_cast(keyValue), keySize)); + return true; + } + //We probably only want to create all buffers after the scan if (preparedQuery && preparedQuery(std::string(static_cast(keyValue), keySize), resourceBuffer)) { - qint64 revision = metadataBuffer->revision(); + qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; auto adaptor = QSharedPointer::create(); adaptor->mLocalBuffer = localBuffer; adaptor->mResourceBuffer = resourceBuffer; diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 6b93985..c9e4d7a 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -19,7 +19,9 @@ #include "resourcefactory.h" #include "facade.h" +#include "entitybuffer.h" #include "dummycalendar_generated.h" +#include "metadata_generated.h" #include static std::string createEvent() @@ -64,51 +66,67 @@ void findByRemoteId(QSharedPointer storage, const QString &ri //TODO lookup in rid index instead of doing a full scan const std::string ridString = rid.toStdString(); storage->scan("", [&](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { - auto eventBuffer = DummyCalendar::GetDummyEvent(dataValue); - if (std::string(eventBuffer->remoteId()->c_str(), eventBuffer->remoteId()->size()) == ridString) { - callback(keyValue, keySize, dataValue, dataSize); + if (QByteArray::fromRawData(static_cast(keyValue), keySize).startsWith("__internal")) { + return true; } + + Akonadi2::EntityBuffer::extractResourceBuffer(dataValue, dataSize, [&](const flatbuffers::Vector *buffer) { + flatbuffers::Verifier verifier(buffer->Data(), buffer->size()); + if (DummyCalendar::VerifyDummyEventBuffer(verifier)) { + DummyCalendar::DummyEvent const *resourceBuffer = DummyCalendar::GetDummyEvent(buffer->Data()); + if (resourceBuffer && resourceBuffer->remoteId()) { + if (std::string(resourceBuffer->remoteId()->c_str(), resourceBuffer->remoteId()->size()) == ridString) { + callback(keyValue, keySize, dataValue, dataSize); + } + } + } + }); return true; }); } -void DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) +Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) { - //TODO use a read-only transaction during the complete sync to sync against a defined revision - - qDebug() << "synchronize with source"; - - auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); - for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) { - bool isNew = true; - if (storage->exists()) { - findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) { - isNew = false; - }); - } - - if (isNew) { - //TODO: perhaps it would be more convenient to populate the domain types? - //Resource specific parts are not accessible that way, but then we would only have to implement the property mapping in one place - const QByteArray data = it.value().toUtf8(); - auto eventBuffer = DummyCalendar::GetDummyEvent(data.data()); - - //Map the source format to the buffer format (which happens to be an exact copy here) - auto builder = DummyCalendar::DummyEventBuilder(m_fbb); - builder.add_summary(m_fbb.CreateString(eventBuffer->summary()->c_str())); - auto buffer = builder.Finish(); - DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); - - //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. - const auto key = QUuid::createUuid().toString().toUtf8(); - //TODO can we really just start populating the buffer and pass the buffer builder? - qDebug() << "new event"; - pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); - } else { //modification - //TODO diff and create modification if necessary + return Async::start([this, pipeline](Async::Future &f) { + //TODO use a read-only transaction during the complete sync to sync against a defined revision + auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); + for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) { + bool isNew = true; + if (storage->exists()) { + findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) { + isNew = false; + }); + } + if (isNew) { + m_fbb.Clear(); + + const QByteArray data = it.value().toUtf8(); + auto eventBuffer = DummyCalendar::GetDummyEvent(data.data()); + + //Map the source format to the buffer format (which happens to be an exact copy here) + auto summary = m_fbb.CreateString(eventBuffer->summary()->c_str()); + auto rid = m_fbb.CreateString(it.key().toStdString().c_str()); + auto description = m_fbb.CreateString(it.key().toStdString().c_str()); + static uint8_t rawData[100]; + auto attachment = m_fbb.CreateVector(rawData, 100); + + auto builder = DummyCalendar::DummyEventBuilder(m_fbb); + builder.add_summary(summary); + builder.add_remoteId(rid); + builder.add_description(description); + builder.add_attachment(attachment); + auto buffer = builder.Finish(); + DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); + //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. + const auto key = QUuid::createUuid().toString().toUtf8(); + pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); + } else { //modification + //TODO diff and create modification if necessary + } } - } - //TODO find items to remove + //TODO find items to remove + f.setFinished(); + }); } void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 807a654..dba674f 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h @@ -20,6 +20,7 @@ #pragma once #include "common/resource.h" +#include "async/src/async.h" #include @@ -30,7 +31,7 @@ class DummyResource : public Akonadi2::Resource { public: DummyResource(); - void synchronizeWithSource(Akonadi2::Pipeline *pipeline); + Async::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline); void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); private: diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 328d4d6..8e94213 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp @@ -35,7 +35,6 @@ Listener::Listener(const QString &resourceName, QObject *parent) : QObject(parent), m_server(new QLocalServer(this)), - m_revision(0), m_resourceName(resourceName), m_resource(0), m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), @@ -46,18 +45,18 @@ Listener::Listener(const QString &resourceName, QObject *parent) this, &Listener::refreshRevision); connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); - Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); + log(QString("Trying to open %1").arg(resourceName)); if (!m_server->listen(resourceName)) { // FIXME: multiple starts need to be handled here m_server->removeServer(resourceName); if (!m_server->listen(resourceName)) { - Akonadi2::Console::main()->log("Utter failure to start server"); + log("Utter failure to start server"); exit(-1); } } if (m_server->isListening()) { - Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); + log(QString("Listening on %1").arg(m_server->serverName())); } //TODO: experiment with different timeouts @@ -73,19 +72,6 @@ Listener::~Listener() { } -void Listener::setRevision(unsigned long long revision) -{ - if (m_revision != revision) { - m_revision = revision; - updateClientsWithRevision(); - } -} - -unsigned long long Listener::revision() const -{ - return m_revision; -} - void Listener::closeAllConnections() { for (Client &client: m_connections) { @@ -101,14 +87,14 @@ void Listener::closeAllConnections() void Listener::acceptConnection() { - Akonadi2::Console::main()->log(QString("Accepting connection")); + log(QString("Accepting connection")); QLocalSocket *socket = m_server->nextPendingConnection(); if (!socket) { return; } - Akonadi2::Console::main()->log("Got a connection"); + log("Got a connection"); Client client("Unknown Client", socket); connect(socket, &QIODevice::readyRead, this, &Listener::readFromSocket); @@ -125,12 +111,12 @@ void Listener::clientDropped() return; } - Akonadi2::Console::main()->log("Dropping connection..."); + log("Dropping connection..."); QMutableVectorIterator it(m_connections); while (it.hasNext()) { const Client &client = it.next(); if (client.socket == socket) { - Akonadi2::Console::main()->log(QString(" dropped... %1").arg(client.name)); + log(QString(" dropped... %1").arg(client.name)); it.remove(); break; } @@ -154,7 +140,7 @@ void Listener::readFromSocket() return; } - Akonadi2::Console::main()->log("Reading from socket..."); + log("Reading from socket..."); for (Client &client: m_connections) { if (client.socket == socket) { client.commandBuffer += socket->readAll(); @@ -188,6 +174,57 @@ void Listener::processClientBuffers() } } +void Listener::processCommand(int commandId, uint messageId, Client &client, uint size, const std::function &callback) +{ + switch (commandId) { + case Akonadi2::Commands::HandshakeCommand: { + flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); + if (Akonadi2::VerifyHandshakeBuffer(verifier)) { + auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); + client.name = buffer->name()->c_str(); + sendCurrentRevision(client); + } else { + qWarning() << "received invalid command"; + } + break; + } + case Akonadi2::Commands::SynchronizeCommand: { + log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); + loadResource(); + if (m_resource) { + qDebug() << "synchronizing"; + m_resource->synchronizeWithSource(m_pipeline).then([callback](Async::Future &f){ + //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result + callback(); + f.setFinished(); + }).exec(); + return; + } else { + qWarning() << "No resource loaded"; + } + break; + } + case Akonadi2::Commands::FetchEntityCommand: + case Akonadi2::Commands::DeleteEntityCommand: + case Akonadi2::Commands::ModifyEntityCommand: + case Akonadi2::Commands::CreateEntityCommand: + log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); + m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); + break; + default: + if (commandId > Akonadi2::Commands::CustomCommand) { + loadResource(); + if (m_resource) { + m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); + } + } else { + //TODO: handle error: we don't know wtf this command is + } + break; + } + callback(); +} + bool Listener::processClientBuffer(Client &client) { static const int headerSize = Akonadi2::Commands::headerSize(); @@ -195,58 +232,22 @@ bool Listener::processClientBuffer(Client &client) return false; } - int commandId; - uint messageId, size; - messageId = *(uint*)client.commandBuffer.constData(); - commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); - size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); + const uint messageId = *(uint*)client.commandBuffer.constData(); + const int commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); + const uint size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); //TODO: reject messages above a certain size? if (size <= uint(client.commandBuffer.size() - headerSize)) { client.commandBuffer.remove(0, headerSize); - switch (commandId) { - case Akonadi2::Commands::HandshakeCommand: { - flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); - if (Akonadi2::VerifyHandshakeBuffer(verifier)) { - auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); - client.name = buffer->name()->c_str(); - sendCurrentRevision(client); - } - break; - } - case Akonadi2::Commands::SynchronizeCommand: { - Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); - loadResource(); - if (m_resource) { - m_resource->synchronizeWithSource(m_pipeline); - } - break; - } - case Akonadi2::Commands::FetchEntityCommand: - case Akonadi2::Commands::DeleteEntityCommand: - case Akonadi2::Commands::ModifyEntityCommand: - case Akonadi2::Commands::CreateEntityCommand: - Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); - m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); - break; - default: - if (commandId > Akonadi2::Commands::CustomCommand) { - loadResource(); - if (m_resource) { - m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); - } - } else { - //TODO: handle error: we don't know wtf this command is - } - break; - } - - //TODO: async commands == async sendCommandCompleted - Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); + processCommand(commandId, messageId, client, size, [this, messageId, commandId, &client]() { + log(QString("\tCompleted command messageid %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); + //FIXME, client needs to become a shared pointer and not a reference, or we have to search through m_connections everytime. + sendCommandCompleted(client, messageId); + }); client.commandBuffer.remove(0, size); - sendCommandCompleted(client, messageId); + return client.commandBuffer.size() >= headerSize; } @@ -259,7 +260,7 @@ void Listener::sendCurrentRevision(Client &client) return; } - auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); + auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); m_fbb.Clear(); @@ -279,14 +280,12 @@ void Listener::sendCommandCompleted(Client &client, uint messageId) void Listener::refreshRevision() { - //TODO this should be coming out of m_pipeline->storage() - ++m_revision; updateClientsWithRevision(); } void Listener::updateClientsWithRevision() { - auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); + auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); for (const Client &client: m_connections) { @@ -308,13 +307,19 @@ void Listener::loadResource() Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); if (resourceFactory) { m_resource = resourceFactory->createResource(); - Akonadi2::Console::main()->log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); - Akonadi2::Console::main()->log(QString("\tResource: %1").arg((qlonglong)m_resource)); + log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); + log(QString("\tResource: %1").arg((qlonglong)m_resource)); //TODO: this doesn't really list all the facades .. fix - Akonadi2::Console::main()->log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade(m_resourceName)->type())); + log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade(m_resourceName)->type())); } else { - Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); + log(QString("Failed to load resource %1").arg(m_resourceName)); } //TODO: on failure ... what? + //Enter broken state? +} + +void Listener::log(const QString &message) +{ + Akonadi2::Console::main()->log("Listener: " + message); } diff --git a/synchronizer/listener.h b/synchronizer/listener.h index 357ae37..4c35191 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h @@ -61,9 +61,6 @@ public: Listener(const QString &resourceName, QObject *parent = 0); ~Listener(); - void setRevision(unsigned long long revision); - unsigned long long revision() const; - Q_SIGNALS: void noClients(); @@ -79,15 +76,16 @@ private Q_SLOTS: void refreshRevision(); private: + void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function &callback); bool processClientBuffer(Client &client); void sendCurrentRevision(Client &client); void sendCommandCompleted(Client &client, uint messageId); void updateClientsWithRevision(); void loadResource(); + void log(const QString &); QLocalServer *m_server; QVector m_connections; - unsigned long long m_revision; flatbuffers::FlatBufferBuilder m_fbb; const QString m_resourceName; Akonadi2::Resource *m_resource; diff --git a/tests/dummyresourcefacadetest.cpp b/tests/dummyresourcefacadetest.cpp index d815e9b..e4d27fc 100644 --- a/tests/dummyresourcefacadetest.cpp +++ b/tests/dummyresourcefacadetest.cpp @@ -51,17 +51,10 @@ private Q_SLOTS: query.ids << "key50"; query.resources << "dummyresource"; - auto result = Akonadi2::Store::load(query); - bool complete = false; - QVector results; - result->onAdded([&results](const Akonadi2::Domain::Event::Ptr &e) { - results << e; - }); - result->onComplete([&complete]() { - complete = true; - }); - QTRY_VERIFY(complete); - QCOMPARE(results.size(), 1); + //FIXME avoid sync somehow. No synchronizer access here (perhaps configure the instance above accordingly?) + async::SyncListResult result(Akonadi2::Store::load(query)); + result.exec(); + QCOMPARE(result.size(), 1); Akonadi2::Storage storage(testDataPath, dbName); storage.removeFromDisk(); diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 75d29de..0c02675 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp @@ -2,7 +2,7 @@ #include -#include "common/resource.h" +#include "dummyresource/resourcefactory.h" #include "clientapi.h" class DummyResourceTest : public QObject @@ -13,12 +13,23 @@ private Q_SLOTS: { auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy"); QVERIFY(factory); + Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy", Akonadi2::Storage::ReadWrite); + store.removeFromDisk(); } void cleanupTestCase() { } + void testResource() + { + Akonadi2::Pipeline pipeline("org.kde.dummy"); + DummyResource resource; + auto job = resource.synchronizeWithSource(&pipeline); + auto future = job.exec(); + QTRY_VERIFY(future.isFinished()); + } + void testSync() { Akonadi2::Query query; @@ -27,6 +38,8 @@ private Q_SLOTS: async::SyncListResult result(Akonadi2::Store::load(query)); result.exec(); QVERIFY(!result.isEmpty()); + auto value = result.first(); + qDebug() << value->getProperty("summary"); } }; -- cgit v1.2.3