From 22af1ed535b4afc8db3804e72bc5adb1a1b28d60 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 25 Nov 2016 08:27:06 +0100 Subject: Added the flush command. Instead of trying to actually flush queues, we send a special command through the same queues as the other commands and can thus guarantee that the respective commands have been processed without blocking anything. --- common/CMakeLists.txt | 2 ++ common/commands.cpp | 2 ++ common/commands.h | 1 + common/commands/flush.fbs | 8 ++++++ common/commands/synchronize.fbs | 2 -- common/flush.h | 45 ++++++++++++++++++++++++++++++++ common/genericresource.cpp | 57 +++++++++++++++++++++++++++++++++++++++++ common/genericresource.h | 1 + common/listener.cpp | 19 +++++--------- common/notification.h | 10 +++++--- common/query.h | 6 +++++ common/queuedcommand.fbs | 3 --- common/resourceaccess.cpp | 25 +++++++++--------- common/resourceaccess.h | 9 +++++-- common/resourcecontrol.cpp | 34 +++++++++++++++++++----- common/resourcecontrol.h | 4 +++ common/store.cpp | 18 +------------ common/synchronizer.cpp | 23 +++++++++++++++++ common/synchronizer.h | 21 +++++++++++---- 19 files changed, 227 insertions(+), 63 deletions(-) create mode 100644 common/commands/flush.fbs create mode 100644 common/flush.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 5ba524b..8a16af4 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -95,6 +95,7 @@ generate_flatbuffers( commands/notification commands/revisionreplayed commands/inspection + commands/flush domain/event domain/mail domain/folder @@ -130,6 +131,7 @@ install(FILES bufferadaptor.h test.h log.h + flush.h ${CMAKE_CURRENT_BINARY_DIR}/sink_export.h DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel ) diff --git a/common/commands.cpp b/common/commands.cpp index 91657b8..c0781f6 100644 --- a/common/commands.cpp +++ b/common/commands.cpp @@ -61,6 +61,8 @@ QByteArray name(int commandId) return "Inspection"; case RemoveFromDiskCommand: return "RemoveFromDisk"; + case FlushCommand: + return "Flush"; case CustomCommand: return "Custom"; }; diff --git a/common/commands.h b/common/commands.h index b97bbc6..0da1b3c 100644 --- a/common/commands.h +++ b/common/commands.h @@ -48,6 +48,7 @@ enum CommandIds RevisionReplayedCommand, InspectionCommand, RemoveFromDiskCommand, + FlushCommand, CustomCommand = 0xffff }; diff --git a/common/commands/flush.fbs b/common/commands/flush.fbs new file mode 100644 index 0000000..179f760 --- /dev/null +++ b/common/commands/flush.fbs @@ -0,0 +1,8 @@ +namespace Sink.Commands; + +table Flush { + id: string; + type: int; //See flush.h +} + +root_type Flush; diff --git a/common/commands/synchronize.fbs b/common/commands/synchronize.fbs index 62f4b2b..7b32305 100644 --- a/common/commands/synchronize.fbs +++ b/common/commands/synchronize.fbs @@ -1,8 +1,6 @@ namespace Sink.Commands; table Synchronize { - sourceSync: bool; //Synchronize with source - localSync: bool; //Ensure all queues are processed so the local state is up-to date. query: string; } diff --git a/common/flush.h b/common/flush.h new file mode 100644 index 0000000..3f04608 --- /dev/null +++ b/common/flush.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" + +namespace Sink { +namespace Flush { + +enum FlushType { + /** + * Guarantees that any commands issued before this flush are written back to the server once this flush completes. + * Note that this does not guarantee the success of writeback, only that an attempt has been made. + */ + FlushReplayQueue, + /** + * Guarantees that any synchronization request issued before this flush has been executed and that all entities created by it have been processed once this flush completes. + */ + FlushSynchronization, + /** + * Guarantees that any modification issued before this flush has been processed once this flush completes. + */ + FlushUserQueue +}; + +} +} + diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 7c4d4ea..7b83957 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -27,6 +27,7 @@ #include "deleteentity_generated.h" #include "inspection_generated.h" #include "notification_generated.h" +#include "flush_generated.h" #include "domainadaptor.h" #include "commands.h" #include "index.h" @@ -54,6 +55,7 @@ class CommandProcessor : public QObject { Q_OBJECT typedef std::function(void const *, size_t)> InspectionFunction; + typedef std::function(void const *, size_t)> FlushFunction; SINK_DEBUG_AREA("commandprocessor") public: @@ -75,6 +77,11 @@ public: mInspect = f; } + void setFlushCommand(const FlushFunction &f) + { + mFlush = f; + } + signals: void error(int errorCode, const QString &errorMessage); @@ -124,6 +131,13 @@ private slots: } else { return KAsync::error(-1, "Missing inspection command."); } + case Sink::Commands::FlushCommand: + if (mFlush) { + return mFlush(queuedCommand->command()->Data(), queuedCommand->command()->size()) + .syncThen([]() { return -1; }); + } else { + return KAsync::error(-1, "Missing inspection command."); + } default: return KAsync::error(-1, "Unhandled command"); } @@ -219,6 +233,7 @@ private: // The lowest revision we no longer need qint64 mLowerBoundRevision; InspectionFunction mInspect; + FlushFunction mFlush; }; GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer &pipeline ) @@ -266,6 +281,26 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q } return KAsync::error(-1, "Invalid inspection command."); }); + mProcessor->setFlushCommand([this](void const *command, size_t size) { + flatbuffers::Verifier verifier((const uint8_t *)command, size); + if (Sink::Commands::VerifyFlushBuffer(verifier)) { + auto buffer = Sink::Commands::GetFlush(command); + const auto flushType = buffer->type(); + const auto flushId = BufferUtils::extractBuffer(buffer->id()); + if (flushType == Sink::Flush::FlushReplayQueue) { + SinkTrace() << "Flushing synchronizer "; + mSynchronizer->flush(flushType, flushId); + } else { + SinkTrace() << "Emitting flush completion" << flushId; + Sink::Notification n; + n.type = Sink::Notification::FlushCompletion; + n.id = flushId; + emit notify(n); + } + return KAsync::null(); + } + return KAsync::error(-1, "Invalid flush command."); + }); { auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); Q_ASSERT(ret); @@ -371,6 +406,10 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt void GenericResource::processCommand(int commandId, const QByteArray &data) { + if (commandId == Commands::FlushCommand) { + processFlushCommand(data); + return; + } static int modifications = 0; mUserQueue.startTransaction(); enqueueCommand(mUserQueue, commandId, data); @@ -384,6 +423,24 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) } } +void GenericResource::processFlushCommand(const QByteArray &data) +{ + flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); + if (Sink::Commands::VerifyFlushBuffer(verifier)) { + auto buffer = Sink::Commands::GetFlush(data.constData()); + const auto flushType = buffer->type(); + const auto flushId = BufferUtils::extractBuffer(buffer->id()); + if (flushType == Sink::Flush::FlushSynchronization) { + mSynchronizer->flush(flushType, flushId); + } else { + mUserQueue.startTransaction(); + enqueueCommand(mUserQueue, Commands::FlushCommand, data); + mUserQueue.commit(); + } + } + +} + KAsync::Job GenericResource::synchronizeWithSource(const Sink::QueryBase &query) { return KAsync::start([this, query] { diff --git a/common/genericresource.h b/common/genericresource.h index 3f92e93..9447c8b 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -47,6 +47,7 @@ public: virtual ~GenericResource(); virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; + virtual void processFlushCommand(const QByteArray &data); virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; virtual KAsync::Job processAllMessages() Q_DECL_OVERRIDE; virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; diff --git a/common/listener.cpp b/common/listener.cpp index c3c6bc2..2ab0333 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -33,7 +33,6 @@ #include "common/synchronize_generated.h" #include "common/notification_generated.h" #include "common/revisionreplayed_generated.h" -#include "common/inspection_generated.h" #include #include @@ -244,18 +243,13 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c auto timer = QSharedPointer::create(); timer->start(); auto job = KAsync::null(); - if (buffer->sourceSync()) { - Sink::QueryBase query; - if (buffer->query()) { - auto data = QByteArray::fromStdString(buffer->query()->str()); - QDataStream stream(&data, QIODevice::ReadOnly); - stream >> query; - } - job = loadResource().synchronizeWithSource(query); - } - if (buffer->localSync()) { - job = job.then(loadResource().processAllMessages()); + Sink::QueryBase query; + if (buffer->query()) { + auto data = QByteArray::fromStdString(buffer->query()->str()); + QDataStream stream(&data, QIODevice::ReadOnly); + stream >> query; } + job = loadResource().synchronizeWithSource(query); job.then([callback, timer](const KAsync::Error &error) { if (error) { SinkWarning() << "Sync failed: " << error.errorMessage; @@ -279,6 +273,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c case Sink::Commands::DeleteEntityCommand: case Sink::Commands::ModifyEntityCommand: case Sink::Commands::CreateEntityCommand: + case Sink::Commands::FlushCommand: SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; loadResource().processCommand(commandId, commandBuffer); break; diff --git a/common/notification.h b/common/notification.h index dcf00a3..b1bd290 100644 --- a/common/notification.h +++ b/common/notification.h @@ -37,17 +37,19 @@ public: Warning, Progress, Inspection, - RevisionUpdate + RevisionUpdate, + FlushCompletion }; enum InspectionCode { - Success, + Success = 0, Failure }; QByteArray id; - int type; + int type = 0; QString message; - int code; + //A return code. Zero typically indicates success. + int code = 0; }; } diff --git a/common/query.h b/common/query.h index 0bc5141..2adb7e9 100644 --- a/common/query.h +++ b/common/query.h @@ -465,6 +465,12 @@ class SyncScope : public QueryBase { public: using QueryBase::QueryBase; + SyncScope(const QueryBase &other) + : QueryBase(other) + { + + } + Query::Filter getResourceFilter() const { return mResourceFilter; diff --git a/common/queuedcommand.fbs b/common/queuedcommand.fbs index 06226d3..114e2cd 100644 --- a/common/queuedcommand.fbs +++ b/common/queuedcommand.fbs @@ -3,9 +3,6 @@ namespace Sink; table QueuedCommand { commandId: int; command: [ubyte]; - // entityId: string; - // sourceRevision: ulong; - // targetRevision: [ubyte]; } root_type QueuedCommand; diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 822b5cd..b46e8b2 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -31,6 +31,7 @@ #include "common/deleteentity_generated.h" #include "common/revisionreplayed_generated.h" #include "common/inspection_generated.h" +#include "common/flush_generated.h" #include "common/entitybuffer.h" #include "common/bufferutils.h" #include "common/test.h" @@ -291,16 +292,6 @@ KAsync::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu }); } -KAsync::Job ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) -{ - SinkTrace() << "Sending synchronize command: " << sourceSync << localSync; - flatbuffers::FlatBufferBuilder fbb; - auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync); - Sink::Commands::FinishSynchronizeBuffer(fbb, command); - open(); - return sendCommand(Commands::SynchronizeCommand, fbb); -} - KAsync::Job ResourceAccess::synchronizeResource(const Sink::QueryBase &query) { flatbuffers::FlatBufferBuilder fbb; @@ -311,8 +302,6 @@ KAsync::Job ResourceAccess::synchronizeResource(const Sink::QueryBase &que } auto q = fbb.CreateString(queryString.toStdString()); auto builder = Sink::Commands::SynchronizeBuilder(fbb); - builder.add_sourceSync(true); - builder.add_localSync(false); builder.add_query(q); Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); @@ -390,6 +379,16 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp return sendCommand(Sink::Commands::InspectionCommand, fbb); } +KAsync::Job ResourceAccess::sendFlushCommand(int flushType, const QByteArray &flushId) +{ + flatbuffers::FlatBufferBuilder fbb; + auto id = fbb.CreateString(flushId.toStdString()); + auto location = Sink::Commands::CreateFlush(fbb, id, flushType); + Sink::Commands::FinishFlushBuffer(fbb, location); + open(); + return sendCommand(Sink::Commands::FlushCommand, fbb); +} + void ResourceAccess::open() { if (d->socket && d->socket->isValid()) { @@ -613,6 +612,8 @@ bool ResourceAccess::processMessageBuffer() [[clang::fallthrough]]; case Sink::Notification::Warning: [[clang::fallthrough]]; + case Sink::Notification::FlushCompletion: + [[clang::fallthrough]]; case Sink::Notification::Progress: { auto n = getNotification(buffer); SinkTrace() << "Received notification: " << n.type; diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 755c8a7..4229161 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -29,6 +29,7 @@ #include #include "notification.h" +#include "flush.h" #include "query.h" #include "log.h" @@ -50,7 +51,6 @@ public: } virtual KAsync::Job sendCommand(int commandId) = 0; virtual KAsync::Job sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) = 0; - virtual KAsync::Job synchronizeResource(bool remoteSync, bool localSync) = 0; virtual KAsync::Job synchronizeResource(const Sink::QueryBase &filter) = 0; virtual KAsync::Job sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) @@ -75,6 +75,11 @@ public: return KAsync::null(); }; + virtual KAsync::Job sendFlushCommand(int flushType, const QByteArray &flushId) + { + return KAsync::null(); + } + int getResourceStatus() const { return mResourceStatus; @@ -108,7 +113,6 @@ public: KAsync::Job sendCommand(int commandId) Q_DECL_OVERRIDE; KAsync::Job sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) Q_DECL_OVERRIDE; - KAsync::Job synchronizeResource(bool remoteSync, bool localSync) Q_DECL_OVERRIDE; KAsync::Job synchronizeResource(const Sink::QueryBase &filter) Q_DECL_OVERRIDE; KAsync::Job sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) Q_DECL_OVERRIDE; KAsync::Job @@ -117,6 +121,7 @@ public: KAsync::Job sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE; KAsync::Job sendInspectionCommand(int inspectionType,const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expecedValue) Q_DECL_OVERRIDE; + KAsync::Job sendFlushCommand(int flushType, const QByteArray &flushId) Q_DECL_OVERRIDE; /** * Tries to connect to server, and returns a connected socket on success. */ diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp index 3568844..af98b8b 100644 --- a/common/resourcecontrol.cpp +++ b/common/resourcecontrol.cpp @@ -85,17 +85,39 @@ KAsync::Job ResourceControl::flushMessageQueue(const QByteArrayList &resou SinkTrace() << "flushMessageQueue" << resourceIdentifier; return KAsync::value(resourceIdentifier) .template each([](const QByteArray &resource) { - SinkTrace() << "Flushing message queue " << resource; - auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); - resourceAccess->open(); - return resourceAccess->synchronizeResource(false, true) - .addToContext(resourceAccess); + return flushMessageQueue(resource); }); } KAsync::Job ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier) { - return flushMessageQueue(QByteArrayList() << resourceIdentifier); + return flush(Flush::FlushUserQueue, resourceIdentifier).then(flush(Flush::FlushSynchronization, resourceIdentifier)); +} + +KAsync::Job ResourceControl::flush(Flush::FlushType type, const QByteArray &resourceIdentifier) +{ + auto resourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier, ResourceConfig::getResourceType(resourceIdentifier)); + auto notifier = QSharedPointer::create(resourceAccess); + auto id = QUuid::createUuid().toByteArray(); + return KAsync::start([=](KAsync::Future &future) { + SinkTrace() << "Waiting for notification notification " << id; + notifier->registerHandler([&future, id](const Notification ¬ification) { + SinkTrace() << "Received notification " << notification.type << notification.id; + if (notification.id == id) { + SinkTrace() << "FlushComplete"; + if (notification.code) { + SinkWarning() << "Flush return an error"; + future.setError(-1, "Flush returned an error: " + notification.message); + } else { + future.setFinished(); + } + } + }); + resourceAccess->sendFlushCommand(type, id).onError([&future] (const KAsync::Error &error) { + SinkWarning() << "Failed to send command"; + future.setError(1, "Failed to send command: " + error.errorMessage); + }).exec(); + }); } KAsync::Job ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier) diff --git a/common/resourcecontrol.h b/common/resourcecontrol.h index 9e603e4..b910441 100644 --- a/common/resourcecontrol.h +++ b/common/resourcecontrol.h @@ -26,6 +26,7 @@ #include #include "inspection.h" +#include "flush.h" namespace Sink { namespace ResourceControl { @@ -58,5 +59,8 @@ KAsync::Job SINK_EXPORT flushMessageQueue(const QByteArray &resourceIdenti */ KAsync::Job SINK_EXPORT flushReplayQueue(const QByteArrayList &resourceIdentifier); KAsync::Job SINK_EXPORT flushReplayQueue(const QByteArray &resourceIdentifier); + +KAsync::Job SINK_EXPORT flush(Flush::FlushType, const QByteArray &resourceIdentifier); + } } diff --git a/common/store.cpp b/common/store.cpp index 6aae00f..8b8de1f 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -255,23 +255,7 @@ KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) KAsync::Job Store::synchronize(const Sink::Query &query) { - auto resources = getResources(query.getResourceFilter()).keys(); - SinkTrace() << "synchronize" << resources; - return KAsync::value(resources) - .template each([query](const QByteArray &resource) { - SinkTrace() << "Synchronizing " << resource; - auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); - return resourceAccess->synchronizeResource(true, false) - .addToContext(resourceAccess) - .then([](const KAsync::Error &error) { - if (error) { - SinkWarning() << "Error during sync."; - return KAsync::error(error); - } - SinkTrace() << "synced."; - return KAsync::null(); - }); - }); + return synchronize(Sink::SyncScope{static_cast(query)}); } KAsync::Job Store::synchronize(const Sink::SyncScope &scope) diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 5bde597..f7dd816 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -27,6 +27,8 @@ #include "createentity_generated.h" #include "modifyentity_generated.h" #include "deleteentity_generated.h" +#include "flush_generated.h" +#include "notification_generated.h" SINK_DEBUG_AREA("synchronizer") @@ -263,6 +265,13 @@ KAsync::Job Synchronizer::synchronize(const Sink::QueryBase &query) return processSyncQueue(); } +void Synchronizer::flush(int commandId, const QByteArray &flushId) +{ + SinkTrace() << "Flushing the synchronization queue"; + mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; + processSyncQueue().exec(); +} + KAsync::Job Synchronizer::processSyncQueue() { if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { @@ -279,6 +288,20 @@ KAsync::Job Synchronizer::processSyncQueue() //Commit after every request, so implementations only have to commit more if they add a lot of data. commit(); }); + } else if (request.requestType == Synchronizer::SyncRequest::Flush) { + if (request.flushType == Flush::FlushReplayQueue) { + SinkTrace() << "Emitting flush completion."; + Sink::Notification n; + n.type = Sink::Notification::FlushCompletion; + n.id = request.flushId; + emit notify(n); + } else { + flatbuffers::FlatBufferBuilder fbb; + auto flushId = fbb.CreateString(request.flushId); + auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); + Sink::Commands::FinishFlushBuffer(fbb, location); + enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); + } } else { job = replayNextRevision(); } diff --git a/common/synchronizer.h b/common/synchronizer.h index 4d5bdd5..99d4877 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -45,6 +45,7 @@ public: void setup(const std::function &enqueueCommandCallback, MessageQueue &messageQueue); KAsync::Job synchronize(const Sink::QueryBase &query); + void flush(int commandId, const QByteArray &flushId); //Read only access to main storage Storage::EntityStore &store(); @@ -57,6 +58,9 @@ public: bool allChangesReplayed() Q_DECL_OVERRIDE; +signals: + void notify(Notification); + public slots: virtual void revisionChanged() Q_DECL_OVERRIDE; @@ -115,23 +119,30 @@ protected: struct SyncRequest { enum RequestType { Synchronization, - ChangeReplay + ChangeReplay, + Flush }; SyncRequest(const Sink::QueryBase &q) - : flushQueue(false), - requestType(Synchronization), + : requestType(Synchronization), query(q) { } SyncRequest(RequestType type) - : flushQueue(false), + : requestType(type) + { + } + + SyncRequest(RequestType type, int flushType_, const QByteArray &flushId_) + : flushType(flushType_), + flushId(flushId_), requestType(type) { } - bool flushQueue; + int flushType = 0; + QByteArray flushId; RequestType requestType; Sink::QueryBase query; }; -- cgit v1.2.3