From 89b6f63bab839ab0504cd3067f0389afe4dc47e3 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 7 Nov 2016 21:48:44 +0100 Subject: Implement debug stream operators for query. --- common/commands/synchronize.fbs | 1 + common/genericresource.cpp | 6 +++--- common/genericresource.h | 2 +- common/listener.cpp | 8 +++++++- common/query.cpp | 20 +++++++++++++++++++- common/query.h | 4 +++- common/queryrunner.cpp | 2 +- common/resource.cpp | 2 +- common/resource.h | 3 ++- common/resourceaccess.cpp | 19 +++++++++++++++++++ common/resourceaccess.h | 2 ++ common/store.cpp | 30 +++++++++++++++++++++++------- common/store.h | 2 ++ common/synchronizer.cpp | 4 ++-- common/synchronizer.h | 4 ++-- 15 files changed, 88 insertions(+), 21 deletions(-) (limited to 'common') diff --git a/common/commands/synchronize.fbs b/common/commands/synchronize.fbs index 5528166..62f4b2b 100644 --- a/common/commands/synchronize.fbs +++ b/common/commands/synchronize.fbs @@ -3,6 +3,7 @@ namespace Sink.Commands; table Synchronize { sourceSync: bool; //Synchronize with source localSync: bool; //Ensure all queues are processed so the local state is up-to date. + query: string; } root_type Synchronize; diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 39bd39e..1fc7744 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -399,9 +399,9 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) } } -KAsync::Job GenericResource::synchronizeWithSource() +KAsync::Job GenericResource::synchronizeWithSource(const Sink::QueryBase &query) { - return KAsync::start([this]() { + return KAsync::start([this, query] { Sink::Notification n; n.id = "sync"; @@ -413,7 +413,7 @@ KAsync::Job GenericResource::synchronizeWithSource() SinkLog() << " Synchronizing"; // Changereplay would deadlock otherwise when trying to open the synchronization store enableChangeReplay(false); - return mSynchronizer->synchronize() + return mSynchronizer->synchronize(query) .then([this](const KAsync::Error &error) { enableChangeReplay(true); if (!error) { diff --git a/common/genericresource.h b/common/genericresource.h index 687e307..3736c8f 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -47,7 +47,7 @@ public: virtual ~GenericResource(); virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; - virtual KAsync::Job synchronizeWithSource() Q_DECL_OVERRIDE; + virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; virtual KAsync::Job processAllMessages() Q_DECL_OVERRIDE; virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; virtual KAsync::Job diff --git a/common/listener.cpp b/common/listener.cpp index 0742017..c3c6bc2 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -245,7 +245,13 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c timer->start(); auto job = KAsync::null(); if (buffer->sourceSync()) { - job = loadResource().synchronizeWithSource(); + Sink::QueryBase query; + if (buffer->query()) { + auto data = QByteArray::fromStdString(buffer->query()->str()); + QDataStream stream(&data, QIODevice::ReadOnly); + stream >> query; + } + job = loadResource().synchronizeWithSource(query); } if (buffer->localSync()) { job = job.then(loadResource().processAllMessages()); diff --git a/common/query.cpp b/common/query.cpp index 3b717aa..caca775 100644 --- a/common/query.cpp +++ b/common/query.cpp @@ -26,7 +26,7 @@ using namespace Sink; static const int registerQuery = qRegisterMetaTypeStreamOperators(); -QDebug operator<<(QDebug dbg, const Sink::Query::Comparator &c) +QDebug operator<<(QDebug dbg, const Sink::QueryBase::Comparator &c) { if (c.comparator == Sink::Query::Comparator::Equals) { dbg.nospace() << "== " << c.value; @@ -39,6 +39,24 @@ QDebug operator<<(QDebug dbg, const Sink::Query::Comparator &c) return dbg.space(); } +QDebug operator<<(QDebug dbg, const Sink::QueryBase &query) +{ + dbg.nospace() << "Query [" << query.type() << "]\n"; + dbg.nospace() << " Filter: " << query.getBaseFilters() << "\n"; + dbg.nospace() << " Ids: " << query.ids() << "\n"; + dbg.nospace() << " Sorting: " << query.sortProperty() << "\n"; + return dbg.maybeSpace(); +} + +QDebug operator<<(QDebug dbg, const Sink::Query &query) +{ + dbg << static_cast(query); + dbg.nospace() << " Requested: " << query.requestedProperties << "\n"; + dbg.nospace() << " Parent: " << query.parentProperty << "\n"; + dbg.nospace() << " IsLive: " << query.liveQuery() << "\n"; + return dbg.maybeSpace(); +} + QDataStream & operator<< (QDataStream &stream, const Sink::QueryBase::Comparator &comparator) { stream << comparator.comparator; diff --git a/common/query.h b/common/query.h index 4a73357..925b014 100644 --- a/common/query.h +++ b/common/query.h @@ -467,7 +467,9 @@ private: } -QDebug SINK_EXPORT operator<<(QDebug dbg, const Sink::Query::Comparator &c); +QDebug SINK_EXPORT operator<<(QDebug dbg, const Sink::QueryBase::Comparator &); +QDebug SINK_EXPORT operator<<(QDebug dbg, const Sink::QueryBase &); +QDebug SINK_EXPORT operator<<(QDebug dbg, const Sink::Query &); QDataStream & SINK_EXPORT operator<< (QDataStream &stream, const Sink::QueryBase &query); QDataStream & SINK_EXPORT operator>> (QDataStream &stream, Sink::QueryBase &query); diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index cf6fcf8..99f9c2d 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -62,7 +62,7 @@ template QueryRunner::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType) : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider), mBatchSize(query.limit) { - SinkTrace() << "Starting query"; + SinkTrace() << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit; if (query.limit && query.sortProperty().isEmpty()) { SinkWarning() << "A limited query without sorting is typically a bad idea."; } diff --git a/common/resource.cpp b/common/resource.cpp index db64d33..f81f094 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -46,7 +46,7 @@ void Resource::processCommand(int commandId, const QByteArray &data) Q_UNUSED(data) } -KAsync::Job Resource::synchronizeWithSource() +KAsync::Job Resource::synchronizeWithSource(const Sink::QueryBase &query) { return KAsync::null(); } diff --git a/common/resource.h b/common/resource.h index 1dbc365..3cc326c 100644 --- a/common/resource.h +++ b/common/resource.h @@ -28,6 +28,7 @@ namespace Sink { class FacadeFactory; class AdaptorFactoryRegistry; class ResourceContext; +class QueryBase; /** * Resource interface @@ -44,7 +45,7 @@ public: /** * Execute synchronization with the source. */ - virtual KAsync::Job synchronizeWithSource(); + virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &); /** * Process all internal messages, thus ensuring the store is up to date and no pending modifications are left. diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index e509292..1847949 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -301,6 +301,25 @@ KAsync::Job ResourceAccess::synchronizeResource(bool sourceSync, bool loca return sendCommand(Commands::SynchronizeCommand, fbb); } +KAsync::Job ResourceAccess::synchronizeResource(const Sink::QueryBase &query) +{ + flatbuffers::FlatBufferBuilder fbb; + QByteArray queryString; + { + QDataStream stream(&queryString, QIODevice::WriteOnly); + stream << query; + } + auto q = fbb.CreateString(queryString.toStdString()); + auto builder = Sink::Commands::SynchronizeBuilder(fbb); + builder.add_sourceSync(true); + builder.add_localSync(false); + builder.add_query(q); + Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); + + open(); + return sendCommand(Commands::SynchronizeCommand, fbb); +} + KAsync::Job ResourceAccess::sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) { flatbuffers::FlatBufferBuilder fbb; diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 5d66246..95cb667 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -50,6 +50,7 @@ public: virtual KAsync::Job sendCommand(int commandId) = 0; virtual KAsync::Job sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) = 0; virtual KAsync::Job synchronizeResource(bool remoteSync, bool localSync) = 0; + virtual KAsync::Job synchronizeResource(const Sink::QueryBase &filter) = 0; virtual KAsync::Job sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) { @@ -107,6 +108,7 @@ public: KAsync::Job sendCommand(int commandId) Q_DECL_OVERRIDE; KAsync::Job sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) Q_DECL_OVERRIDE; KAsync::Job synchronizeResource(bool remoteSync, bool localSync) Q_DECL_OVERRIDE; + KAsync::Job synchronizeResource(const Sink::QueryBase &filter) Q_DECL_OVERRIDE; KAsync::Job sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) Q_DECL_OVERRIDE; KAsync::Job sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer, const QByteArrayList &changedProperties) Q_DECL_OVERRIDE; diff --git a/common/store.cpp b/common/store.cpp index 2ea5e22..41b4867 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -129,13 +129,8 @@ KAsync::Job queryResource(const QByteArray resourceType, const QByteArray template QSharedPointer Store::loadModel(Query query) { - SinkTrace() << "Query: " << ApplicationDomain::getTypeName(); - SinkTrace() << " Requested: " << query.requestedProperties; - SinkTrace() << " Filter: " << query.getBaseFilters(); - SinkTrace() << " Parent: " << query.parentProperty; - SinkTrace() << " Ids: " << query.ids(); - SinkTrace() << " IsLive: " << query.liveQuery(); - SinkTrace() << " Sorting: " << query.sortProperty(); + query.setType(ApplicationDomain::getTypeName()); + SinkTrace() << "Loading model: " << query; auto model = QSharedPointer>::create(query, query.requestedProperties); //* Client defines lifetime of model @@ -276,6 +271,27 @@ KAsync::Job Store::synchronize(const Sink::Query &query) }); } +KAsync::Job Store::synchronize(const Sink::SyncScope &scope) +{ + auto resources = getResources(scope.getResourceFilter()).keys(); + SinkTrace() << "synchronize" << resources; + return KAsync::value(resources) + .template each([scope](const QByteArray &resource) { + SinkTrace() << "Synchronizing " << resource; + auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); + return resourceAccess->synchronizeResource(scope) + .addToContext(resourceAccess) + .then([](const KAsync::Error &error) { + if (error) { + SinkWarning() << "Error during sync."; + return KAsync::error(error); + } + SinkTrace() << "synced."; + return KAsync::null(); + }); + }); +} + template KAsync::Job Store::fetchOne(const Sink::Query &query) { diff --git a/common/store.h b/common/store.h index 571ffff..931e473 100644 --- a/common/store.h +++ b/common/store.h @@ -86,6 +86,8 @@ KAsync::Job SINK_EXPORT remove(const DomainType &domainObject); */ KAsync::Job SINK_EXPORT synchronize(const Sink::Query &query); +KAsync::Job SINK_EXPORT synchronize(const Sink::SyncScope &query); + /** * Removes all resource data from disk. * diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 206cf5e..85c68e4 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -228,12 +228,12 @@ void Synchronizer::modify(const DomainType &entity) modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName(), entity); } -KAsync::Job Synchronizer::synchronize() +KAsync::Job Synchronizer::synchronize(const Sink::QueryBase &query) { SinkTrace() << "Synchronizing"; mSyncInProgress = true; mMessageQueue->startTransaction(); - return synchronizeWithSource().syncThen([this]() { + return synchronizeWithSource(query).syncThen([this]() { mSyncStore.clear(); mMessageQueue->commit(); mSyncInProgress = false; diff --git a/common/synchronizer.h b/common/synchronizer.h index 12bb587..c03c425 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -41,7 +41,7 @@ public: virtual ~Synchronizer(); void setup(const std::function &enqueueCommandCallback, MessageQueue &messageQueue); - KAsync::Job synchronize(); + KAsync::Job synchronize(const Sink::QueryBase &query); //Read only access to main storage Storage::EntityStore &store(); @@ -91,7 +91,7 @@ protected: // template // void remove(const DomainType &entity); - virtual KAsync::Job synchronizeWithSource() = 0; + virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) = 0; private: void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); -- cgit v1.2.3