diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-25 08:27:06 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-25 09:23:55 +0100 |
commit | 22af1ed535b4afc8db3804e72bc5adb1a1b28d60 (patch) | |
tree | 07665f41d5b40d658e95a64bb76020f1fd9d088e | |
parent | 64d7d7bdd1edb2bcc305ca007784d0708cf7ef3c (diff) | |
download | sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.tar.gz sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.zip |
Added the flush command.
Instead of trying to actually flush queues, we send a special command
through the same queues as the other commands and can thus guarantee
that the respective commands have been processed without blocking
anything.
-rw-r--r-- | common/CMakeLists.txt | 2 | ||||
-rw-r--r-- | common/commands.cpp | 2 | ||||
-rw-r--r-- | common/commands.h | 1 | ||||
-rw-r--r-- | common/commands/flush.fbs | 8 | ||||
-rw-r--r-- | common/commands/synchronize.fbs | 2 | ||||
-rw-r--r-- | common/flush.h | 45 | ||||
-rw-r--r-- | common/genericresource.cpp | 57 | ||||
-rw-r--r-- | common/genericresource.h | 1 | ||||
-rw-r--r-- | common/listener.cpp | 19 | ||||
-rw-r--r-- | common/notification.h | 10 | ||||
-rw-r--r-- | common/query.h | 6 | ||||
-rw-r--r-- | common/queuedcommand.fbs | 3 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 25 | ||||
-rw-r--r-- | common/resourceaccess.h | 9 | ||||
-rw-r--r-- | common/resourcecontrol.cpp | 34 | ||||
-rw-r--r-- | common/resourcecontrol.h | 4 | ||||
-rw-r--r-- | common/store.cpp | 18 | ||||
-rw-r--r-- | common/synchronizer.cpp | 23 | ||||
-rw-r--r-- | common/synchronizer.h | 21 | ||||
-rw-r--r-- | tests/testimplementations.h | 4 |
20 files changed, 227 insertions, 67 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 5ba524b..8a16af4 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -95,6 +95,7 @@ generate_flatbuffers( | |||
95 | commands/notification | 95 | commands/notification |
96 | commands/revisionreplayed | 96 | commands/revisionreplayed |
97 | commands/inspection | 97 | commands/inspection |
98 | commands/flush | ||
98 | domain/event | 99 | domain/event |
99 | domain/mail | 100 | domain/mail |
100 | domain/folder | 101 | domain/folder |
@@ -130,6 +131,7 @@ install(FILES | |||
130 | bufferadaptor.h | 131 | bufferadaptor.h |
131 | test.h | 132 | test.h |
132 | log.h | 133 | log.h |
134 | flush.h | ||
133 | ${CMAKE_CURRENT_BINARY_DIR}/sink_export.h | 135 | ${CMAKE_CURRENT_BINARY_DIR}/sink_export.h |
134 | DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel | 136 | DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel |
135 | ) | 137 | ) |
diff --git a/common/commands.cpp b/common/commands.cpp index 91657b8..c0781f6 100644 --- a/common/commands.cpp +++ b/common/commands.cpp | |||
@@ -61,6 +61,8 @@ QByteArray name(int commandId) | |||
61 | return "Inspection"; | 61 | return "Inspection"; |
62 | case RemoveFromDiskCommand: | 62 | case RemoveFromDiskCommand: |
63 | return "RemoveFromDisk"; | 63 | return "RemoveFromDisk"; |
64 | case FlushCommand: | ||
65 | return "Flush"; | ||
64 | case CustomCommand: | 66 | case CustomCommand: |
65 | return "Custom"; | 67 | return "Custom"; |
66 | }; | 68 | }; |
diff --git a/common/commands.h b/common/commands.h index b97bbc6..0da1b3c 100644 --- a/common/commands.h +++ b/common/commands.h | |||
@@ -48,6 +48,7 @@ enum CommandIds | |||
48 | RevisionReplayedCommand, | 48 | RevisionReplayedCommand, |
49 | InspectionCommand, | 49 | InspectionCommand, |
50 | RemoveFromDiskCommand, | 50 | RemoveFromDiskCommand, |
51 | FlushCommand, | ||
51 | CustomCommand = 0xffff | 52 | CustomCommand = 0xffff |
52 | }; | 53 | }; |
53 | 54 | ||
diff --git a/common/commands/flush.fbs b/common/commands/flush.fbs new file mode 100644 index 0000000..179f760 --- /dev/null +++ b/common/commands/flush.fbs | |||
@@ -0,0 +1,8 @@ | |||
1 | namespace Sink.Commands; | ||
2 | |||
3 | table Flush { | ||
4 | id: string; | ||
5 | type: int; //See flush.h | ||
6 | } | ||
7 | |||
8 | root_type Flush; | ||
diff --git a/common/commands/synchronize.fbs b/common/commands/synchronize.fbs index 62f4b2b..7b32305 100644 --- a/common/commands/synchronize.fbs +++ b/common/commands/synchronize.fbs | |||
@@ -1,8 +1,6 @@ | |||
1 | namespace Sink.Commands; | 1 | namespace Sink.Commands; |
2 | 2 | ||
3 | table Synchronize { | 3 | table Synchronize { |
4 | sourceSync: bool; //Synchronize with source | ||
5 | localSync: bool; //Ensure all queues are processed so the local state is up-to date. | ||
6 | query: string; | 4 | query: string; |
7 | } | 5 | } |
8 | 6 | ||
diff --git a/common/flush.h b/common/flush.h new file mode 100644 index 0000000..3f04608 --- /dev/null +++ b/common/flush.h | |||
@@ -0,0 +1,45 @@ | |||
1 | /* | ||
2 | * Copyright (c) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #pragma once | ||
21 | |||
22 | #include "sink_export.h" | ||
23 | |||
24 | namespace Sink { | ||
25 | namespace Flush { | ||
26 | |||
27 | enum FlushType { | ||
28 | /** | ||
29 | * Guarantees that any commands issued before this flush are written back to the server once this flush completes. | ||
30 | * Note that this does not guarantee the success of writeback, only that an attempt has been made. | ||
31 | */ | ||
32 | FlushReplayQueue, | ||
33 | /** | ||
34 | * Guarantees that any synchronization request issued before this flush has been executed and that all entities created by it have been processed once this flush completes. | ||
35 | */ | ||
36 | FlushSynchronization, | ||
37 | /** | ||
38 | * Guarantees that any modification issued before this flush has been processed once this flush completes. | ||
39 | */ | ||
40 | FlushUserQueue | ||
41 | }; | ||
42 | |||
43 | } | ||
44 | } | ||
45 | |||
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 7c4d4ea..7b83957 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -27,6 +27,7 @@ | |||
27 | #include "deleteentity_generated.h" | 27 | #include "deleteentity_generated.h" |
28 | #include "inspection_generated.h" | 28 | #include "inspection_generated.h" |
29 | #include "notification_generated.h" | 29 | #include "notification_generated.h" |
30 | #include "flush_generated.h" | ||
30 | #include "domainadaptor.h" | 31 | #include "domainadaptor.h" |
31 | #include "commands.h" | 32 | #include "commands.h" |
32 | #include "index.h" | 33 | #include "index.h" |
@@ -54,6 +55,7 @@ class CommandProcessor : public QObject | |||
54 | { | 55 | { |
55 | Q_OBJECT | 56 | Q_OBJECT |
56 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; | 57 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; |
58 | typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction; | ||
57 | SINK_DEBUG_AREA("commandprocessor") | 59 | SINK_DEBUG_AREA("commandprocessor") |
58 | 60 | ||
59 | public: | 61 | public: |
@@ -75,6 +77,11 @@ public: | |||
75 | mInspect = f; | 77 | mInspect = f; |
76 | } | 78 | } |
77 | 79 | ||
80 | void setFlushCommand(const FlushFunction &f) | ||
81 | { | ||
82 | mFlush = f; | ||
83 | } | ||
84 | |||
78 | signals: | 85 | signals: |
79 | void error(int errorCode, const QString &errorMessage); | 86 | void error(int errorCode, const QString &errorMessage); |
80 | 87 | ||
@@ -124,6 +131,13 @@ private slots: | |||
124 | } else { | 131 | } else { |
125 | return KAsync::error<qint64>(-1, "Missing inspection command."); | 132 | return KAsync::error<qint64>(-1, "Missing inspection command."); |
126 | } | 133 | } |
134 | case Sink::Commands::FlushCommand: | ||
135 | if (mFlush) { | ||
136 | return mFlush(queuedCommand->command()->Data(), queuedCommand->command()->size()) | ||
137 | .syncThen<qint64>([]() { return -1; }); | ||
138 | } else { | ||
139 | return KAsync::error<qint64>(-1, "Missing inspection command."); | ||
140 | } | ||
127 | default: | 141 | default: |
128 | return KAsync::error<qint64>(-1, "Unhandled command"); | 142 | return KAsync::error<qint64>(-1, "Unhandled command"); |
129 | } | 143 | } |
@@ -219,6 +233,7 @@ private: | |||
219 | // The lowest revision we no longer need | 233 | // The lowest revision we no longer need |
220 | qint64 mLowerBoundRevision; | 234 | qint64 mLowerBoundRevision; |
221 | InspectionFunction mInspect; | 235 | InspectionFunction mInspect; |
236 | FlushFunction mFlush; | ||
222 | }; | 237 | }; |
223 | 238 | ||
224 | GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) | 239 | GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) |
@@ -266,6 +281,26 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q | |||
266 | } | 281 | } |
267 | return KAsync::error<void>(-1, "Invalid inspection command."); | 282 | return KAsync::error<void>(-1, "Invalid inspection command."); |
268 | }); | 283 | }); |
284 | mProcessor->setFlushCommand([this](void const *command, size_t size) { | ||
285 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | ||
286 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | ||
287 | auto buffer = Sink::Commands::GetFlush(command); | ||
288 | const auto flushType = buffer->type(); | ||
289 | const auto flushId = BufferUtils::extractBuffer(buffer->id()); | ||
290 | if (flushType == Sink::Flush::FlushReplayQueue) { | ||
291 | SinkTrace() << "Flushing synchronizer "; | ||
292 | mSynchronizer->flush(flushType, flushId); | ||
293 | } else { | ||
294 | SinkTrace() << "Emitting flush completion" << flushId; | ||
295 | Sink::Notification n; | ||
296 | n.type = Sink::Notification::FlushCompletion; | ||
297 | n.id = flushId; | ||
298 | emit notify(n); | ||
299 | } | ||
300 | return KAsync::null<void>(); | ||
301 | } | ||
302 | return KAsync::error<void>(-1, "Invalid flush command."); | ||
303 | }); | ||
269 | { | 304 | { |
270 | auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 305 | auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
271 | Q_ASSERT(ret); | 306 | Q_ASSERT(ret); |
@@ -371,6 +406,10 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt | |||
371 | 406 | ||
372 | void GenericResource::processCommand(int commandId, const QByteArray &data) | 407 | void GenericResource::processCommand(int commandId, const QByteArray &data) |
373 | { | 408 | { |
409 | if (commandId == Commands::FlushCommand) { | ||
410 | processFlushCommand(data); | ||
411 | return; | ||
412 | } | ||
374 | static int modifications = 0; | 413 | static int modifications = 0; |
375 | mUserQueue.startTransaction(); | 414 | mUserQueue.startTransaction(); |
376 | enqueueCommand(mUserQueue, commandId, data); | 415 | enqueueCommand(mUserQueue, commandId, data); |
@@ -384,6 +423,24 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) | |||
384 | } | 423 | } |
385 | } | 424 | } |
386 | 425 | ||
426 | void GenericResource::processFlushCommand(const QByteArray &data) | ||
427 | { | ||
428 | flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); | ||
429 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | ||
430 | auto buffer = Sink::Commands::GetFlush(data.constData()); | ||
431 | const auto flushType = buffer->type(); | ||
432 | const auto flushId = BufferUtils::extractBuffer(buffer->id()); | ||
433 | if (flushType == Sink::Flush::FlushSynchronization) { | ||
434 | mSynchronizer->flush(flushType, flushId); | ||
435 | } else { | ||
436 | mUserQueue.startTransaction(); | ||
437 | enqueueCommand(mUserQueue, Commands::FlushCommand, data); | ||
438 | mUserQueue.commit(); | ||
439 | } | ||
440 | } | ||
441 | |||
442 | } | ||
443 | |||
387 | KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) | 444 | KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) |
388 | { | 445 | { |
389 | return KAsync::start<void>([this, query] { | 446 | return KAsync::start<void>([this, query] { |
diff --git a/common/genericresource.h b/common/genericresource.h index 3f92e93..9447c8b 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -47,6 +47,7 @@ public: | |||
47 | virtual ~GenericResource(); | 47 | virtual ~GenericResource(); |
48 | 48 | ||
49 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; | 49 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; |
50 | virtual void processFlushCommand(const QByteArray &data); | ||
50 | virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; | 51 | virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; |
51 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; | 52 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; |
52 | virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; | 53 | virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; |
diff --git a/common/listener.cpp b/common/listener.cpp index c3c6bc2..2ab0333 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -33,7 +33,6 @@ | |||
33 | #include "common/synchronize_generated.h" | 33 | #include "common/synchronize_generated.h" |
34 | #include "common/notification_generated.h" | 34 | #include "common/notification_generated.h" |
35 | #include "common/revisionreplayed_generated.h" | 35 | #include "common/revisionreplayed_generated.h" |
36 | #include "common/inspection_generated.h" | ||
37 | 36 | ||
38 | #include <QLocalServer> | 37 | #include <QLocalServer> |
39 | #include <QLocalSocket> | 38 | #include <QLocalSocket> |
@@ -244,18 +243,13 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
244 | auto timer = QSharedPointer<QTime>::create(); | 243 | auto timer = QSharedPointer<QTime>::create(); |
245 | timer->start(); | 244 | timer->start(); |
246 | auto job = KAsync::null<void>(); | 245 | auto job = KAsync::null<void>(); |
247 | if (buffer->sourceSync()) { | 246 | Sink::QueryBase query; |
248 | Sink::QueryBase query; | 247 | if (buffer->query()) { |
249 | if (buffer->query()) { | 248 | auto data = QByteArray::fromStdString(buffer->query()->str()); |
250 | auto data = QByteArray::fromStdString(buffer->query()->str()); | 249 | QDataStream stream(&data, QIODevice::ReadOnly); |
251 | QDataStream stream(&data, QIODevice::ReadOnly); | 250 | stream >> query; |
252 | stream >> query; | ||
253 | } | ||
254 | job = loadResource().synchronizeWithSource(query); | ||
255 | } | ||
256 | if (buffer->localSync()) { | ||
257 | job = job.then<void>(loadResource().processAllMessages()); | ||
258 | } | 251 | } |
252 | job = loadResource().synchronizeWithSource(query); | ||
259 | job.then<void>([callback, timer](const KAsync::Error &error) { | 253 | job.then<void>([callback, timer](const KAsync::Error &error) { |
260 | if (error) { | 254 | if (error) { |
261 | SinkWarning() << "Sync failed: " << error.errorMessage; | 255 | SinkWarning() << "Sync failed: " << error.errorMessage; |
@@ -279,6 +273,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
279 | case Sink::Commands::DeleteEntityCommand: | 273 | case Sink::Commands::DeleteEntityCommand: |
280 | case Sink::Commands::ModifyEntityCommand: | 274 | case Sink::Commands::ModifyEntityCommand: |
281 | case Sink::Commands::CreateEntityCommand: | 275 | case Sink::Commands::CreateEntityCommand: |
276 | case Sink::Commands::FlushCommand: | ||
282 | SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; | 277 | SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; |
283 | loadResource().processCommand(commandId, commandBuffer); | 278 | loadResource().processCommand(commandId, commandBuffer); |
284 | break; | 279 | break; |
diff --git a/common/notification.h b/common/notification.h index dcf00a3..b1bd290 100644 --- a/common/notification.h +++ b/common/notification.h | |||
@@ -37,17 +37,19 @@ public: | |||
37 | Warning, | 37 | Warning, |
38 | Progress, | 38 | Progress, |
39 | Inspection, | 39 | Inspection, |
40 | RevisionUpdate | 40 | RevisionUpdate, |
41 | FlushCompletion | ||
41 | }; | 42 | }; |
42 | enum InspectionCode { | 43 | enum InspectionCode { |
43 | Success, | 44 | Success = 0, |
44 | Failure | 45 | Failure |
45 | }; | 46 | }; |
46 | 47 | ||
47 | QByteArray id; | 48 | QByteArray id; |
48 | int type; | 49 | int type = 0; |
49 | QString message; | 50 | QString message; |
50 | int code; | 51 | //A return code. Zero typically indicates success. |
52 | int code = 0; | ||
51 | }; | 53 | }; |
52 | } | 54 | } |
53 | 55 | ||
diff --git a/common/query.h b/common/query.h index 0bc5141..2adb7e9 100644 --- a/common/query.h +++ b/common/query.h | |||
@@ -465,6 +465,12 @@ class SyncScope : public QueryBase { | |||
465 | public: | 465 | public: |
466 | using QueryBase::QueryBase; | 466 | using QueryBase::QueryBase; |
467 | 467 | ||
468 | SyncScope(const QueryBase &other) | ||
469 | : QueryBase(other) | ||
470 | { | ||
471 | |||
472 | } | ||
473 | |||
468 | Query::Filter getResourceFilter() const | 474 | Query::Filter getResourceFilter() const |
469 | { | 475 | { |
470 | return mResourceFilter; | 476 | return mResourceFilter; |
diff --git a/common/queuedcommand.fbs b/common/queuedcommand.fbs index 06226d3..114e2cd 100644 --- a/common/queuedcommand.fbs +++ b/common/queuedcommand.fbs | |||
@@ -3,9 +3,6 @@ namespace Sink; | |||
3 | table QueuedCommand { | 3 | table QueuedCommand { |
4 | commandId: int; | 4 | commandId: int; |
5 | command: [ubyte]; | 5 | command: [ubyte]; |
6 | // entityId: string; | ||
7 | // sourceRevision: ulong; | ||
8 | // targetRevision: [ubyte]; | ||
9 | } | 6 | } |
10 | 7 | ||
11 | root_type QueuedCommand; | 8 | root_type QueuedCommand; |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 822b5cd..b46e8b2 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -31,6 +31,7 @@ | |||
31 | #include "common/deleteentity_generated.h" | 31 | #include "common/deleteentity_generated.h" |
32 | #include "common/revisionreplayed_generated.h" | 32 | #include "common/revisionreplayed_generated.h" |
33 | #include "common/inspection_generated.h" | 33 | #include "common/inspection_generated.h" |
34 | #include "common/flush_generated.h" | ||
34 | #include "common/entitybuffer.h" | 35 | #include "common/entitybuffer.h" |
35 | #include "common/bufferutils.h" | 36 | #include "common/bufferutils.h" |
36 | #include "common/test.h" | 37 | #include "common/test.h" |
@@ -291,16 +292,6 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu | |||
291 | }); | 292 | }); |
292 | } | 293 | } |
293 | 294 | ||
294 | KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) | ||
295 | { | ||
296 | SinkTrace() << "Sending synchronize command: " << sourceSync << localSync; | ||
297 | flatbuffers::FlatBufferBuilder fbb; | ||
298 | auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync); | ||
299 | Sink::Commands::FinishSynchronizeBuffer(fbb, command); | ||
300 | open(); | ||
301 | return sendCommand(Commands::SynchronizeCommand, fbb); | ||
302 | } | ||
303 | |||
304 | KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &query) | 295 | KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &query) |
305 | { | 296 | { |
306 | flatbuffers::FlatBufferBuilder fbb; | 297 | flatbuffers::FlatBufferBuilder fbb; |
@@ -311,8 +302,6 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &que | |||
311 | } | 302 | } |
312 | auto q = fbb.CreateString(queryString.toStdString()); | 303 | auto q = fbb.CreateString(queryString.toStdString()); |
313 | auto builder = Sink::Commands::SynchronizeBuilder(fbb); | 304 | auto builder = Sink::Commands::SynchronizeBuilder(fbb); |
314 | builder.add_sourceSync(true); | ||
315 | builder.add_localSync(false); | ||
316 | builder.add_query(q); | 305 | builder.add_query(q); |
317 | Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); | 306 | Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); |
318 | 307 | ||
@@ -390,6 +379,16 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp | |||
390 | return sendCommand(Sink::Commands::InspectionCommand, fbb); | 379 | return sendCommand(Sink::Commands::InspectionCommand, fbb); |
391 | } | 380 | } |
392 | 381 | ||
382 | KAsync::Job<void> ResourceAccess::sendFlushCommand(int flushType, const QByteArray &flushId) | ||
383 | { | ||
384 | flatbuffers::FlatBufferBuilder fbb; | ||
385 | auto id = fbb.CreateString(flushId.toStdString()); | ||
386 | auto location = Sink::Commands::CreateFlush(fbb, id, flushType); | ||
387 | Sink::Commands::FinishFlushBuffer(fbb, location); | ||
388 | open(); | ||
389 | return sendCommand(Sink::Commands::FlushCommand, fbb); | ||
390 | } | ||
391 | |||
393 | void ResourceAccess::open() | 392 | void ResourceAccess::open() |
394 | { | 393 | { |
395 | if (d->socket && d->socket->isValid()) { | 394 | if (d->socket && d->socket->isValid()) { |
@@ -613,6 +612,8 @@ bool ResourceAccess::processMessageBuffer() | |||
613 | [[clang::fallthrough]]; | 612 | [[clang::fallthrough]]; |
614 | case Sink::Notification::Warning: | 613 | case Sink::Notification::Warning: |
615 | [[clang::fallthrough]]; | 614 | [[clang::fallthrough]]; |
615 | case Sink::Notification::FlushCompletion: | ||
616 | [[clang::fallthrough]]; | ||
616 | case Sink::Notification::Progress: { | 617 | case Sink::Notification::Progress: { |
617 | auto n = getNotification(buffer); | 618 | auto n = getNotification(buffer); |
618 | SinkTrace() << "Received notification: " << n.type; | 619 | SinkTrace() << "Received notification: " << n.type; |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 755c8a7..4229161 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -29,6 +29,7 @@ | |||
29 | 29 | ||
30 | #include <flatbuffers/flatbuffers.h> | 30 | #include <flatbuffers/flatbuffers.h> |
31 | #include "notification.h" | 31 | #include "notification.h" |
32 | #include "flush.h" | ||
32 | #include "query.h" | 33 | #include "query.h" |
33 | #include "log.h" | 34 | #include "log.h" |
34 | 35 | ||
@@ -50,7 +51,6 @@ public: | |||
50 | } | 51 | } |
51 | virtual KAsync::Job<void> sendCommand(int commandId) = 0; | 52 | virtual KAsync::Job<void> sendCommand(int commandId) = 0; |
52 | virtual KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) = 0; | 53 | virtual KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) = 0; |
53 | virtual KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) = 0; | ||
54 | virtual KAsync::Job<void> synchronizeResource(const Sink::QueryBase &filter) = 0; | 54 | virtual KAsync::Job<void> synchronizeResource(const Sink::QueryBase &filter) = 0; |
55 | 55 | ||
56 | virtual KAsync::Job<void> sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) | 56 | virtual KAsync::Job<void> sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) |
@@ -75,6 +75,11 @@ public: | |||
75 | return KAsync::null<void>(); | 75 | return KAsync::null<void>(); |
76 | }; | 76 | }; |
77 | 77 | ||
78 | virtual KAsync::Job<void> sendFlushCommand(int flushType, const QByteArray &flushId) | ||
79 | { | ||
80 | return KAsync::null<void>(); | ||
81 | } | ||
82 | |||
78 | int getResourceStatus() const | 83 | int getResourceStatus() const |
79 | { | 84 | { |
80 | return mResourceStatus; | 85 | return mResourceStatus; |
@@ -108,7 +113,6 @@ public: | |||
108 | 113 | ||
109 | KAsync::Job<void> sendCommand(int commandId) Q_DECL_OVERRIDE; | 114 | KAsync::Job<void> sendCommand(int commandId) Q_DECL_OVERRIDE; |
110 | KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) Q_DECL_OVERRIDE; | 115 | KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) Q_DECL_OVERRIDE; |
111 | KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) Q_DECL_OVERRIDE; | ||
112 | KAsync::Job<void> synchronizeResource(const Sink::QueryBase &filter) Q_DECL_OVERRIDE; | 116 | KAsync::Job<void> synchronizeResource(const Sink::QueryBase &filter) Q_DECL_OVERRIDE; |
113 | KAsync::Job<void> sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) Q_DECL_OVERRIDE; | 117 | KAsync::Job<void> sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) Q_DECL_OVERRIDE; |
114 | KAsync::Job<void> | 118 | KAsync::Job<void> |
@@ -117,6 +121,7 @@ public: | |||
117 | KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE; | 121 | KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE; |
118 | KAsync::Job<void> | 122 | KAsync::Job<void> |
119 | sendInspectionCommand(int inspectionType,const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expecedValue) Q_DECL_OVERRIDE; | 123 | sendInspectionCommand(int inspectionType,const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expecedValue) Q_DECL_OVERRIDE; |
124 | KAsync::Job<void> sendFlushCommand(int flushType, const QByteArray &flushId) Q_DECL_OVERRIDE; | ||
120 | /** | 125 | /** |
121 | * Tries to connect to server, and returns a connected socket on success. | 126 | * Tries to connect to server, and returns a connected socket on success. |
122 | */ | 127 | */ |
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp index 3568844..af98b8b 100644 --- a/common/resourcecontrol.cpp +++ b/common/resourcecontrol.cpp | |||
@@ -85,17 +85,39 @@ KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resou | |||
85 | SinkTrace() << "flushMessageQueue" << resourceIdentifier; | 85 | SinkTrace() << "flushMessageQueue" << resourceIdentifier; |
86 | return KAsync::value(resourceIdentifier) | 86 | return KAsync::value(resourceIdentifier) |
87 | .template each([](const QByteArray &resource) { | 87 | .template each([](const QByteArray &resource) { |
88 | SinkTrace() << "Flushing message queue " << resource; | 88 | return flushMessageQueue(resource); |
89 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); | ||
90 | resourceAccess->open(); | ||
91 | return resourceAccess->synchronizeResource(false, true) | ||
92 | .addToContext(resourceAccess); | ||
93 | }); | 89 | }); |
94 | } | 90 | } |
95 | 91 | ||
96 | KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier) | 92 | KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier) |
97 | { | 93 | { |
98 | return flushMessageQueue(QByteArrayList() << resourceIdentifier); | 94 | return flush(Flush::FlushUserQueue, resourceIdentifier).then(flush(Flush::FlushSynchronization, resourceIdentifier)); |
95 | } | ||
96 | |||
97 | KAsync::Job<void> ResourceControl::flush(Flush::FlushType type, const QByteArray &resourceIdentifier) | ||
98 | { | ||
99 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier, ResourceConfig::getResourceType(resourceIdentifier)); | ||
100 | auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess); | ||
101 | auto id = QUuid::createUuid().toByteArray(); | ||
102 | return KAsync::start<void>([=](KAsync::Future<void> &future) { | ||
103 | SinkTrace() << "Waiting for notification notification " << id; | ||
104 | notifier->registerHandler([&future, id](const Notification ¬ification) { | ||
105 | SinkTrace() << "Received notification " << notification.type << notification.id; | ||
106 | if (notification.id == id) { | ||
107 | SinkTrace() << "FlushComplete"; | ||
108 | if (notification.code) { | ||
109 | SinkWarning() << "Flush return an error"; | ||
110 | future.setError(-1, "Flush returned an error: " + notification.message); | ||
111 | } else { | ||
112 | future.setFinished(); | ||
113 | } | ||
114 | } | ||
115 | }); | ||
116 | resourceAccess->sendFlushCommand(type, id).onError([&future] (const KAsync::Error &error) { | ||
117 | SinkWarning() << "Failed to send command"; | ||
118 | future.setError(1, "Failed to send command: " + error.errorMessage); | ||
119 | }).exec(); | ||
120 | }); | ||
99 | } | 121 | } |
100 | 122 | ||
101 | KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier) | 123 | KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier) |
diff --git a/common/resourcecontrol.h b/common/resourcecontrol.h index 9e603e4..b910441 100644 --- a/common/resourcecontrol.h +++ b/common/resourcecontrol.h | |||
@@ -26,6 +26,7 @@ | |||
26 | #include <Async/Async> | 26 | #include <Async/Async> |
27 | 27 | ||
28 | #include "inspection.h" | 28 | #include "inspection.h" |
29 | #include "flush.h" | ||
29 | 30 | ||
30 | namespace Sink { | 31 | namespace Sink { |
31 | namespace ResourceControl { | 32 | namespace ResourceControl { |
@@ -58,5 +59,8 @@ KAsync::Job<void> SINK_EXPORT flushMessageQueue(const QByteArray &resourceIdenti | |||
58 | */ | 59 | */ |
59 | KAsync::Job<void> SINK_EXPORT flushReplayQueue(const QByteArrayList &resourceIdentifier); | 60 | KAsync::Job<void> SINK_EXPORT flushReplayQueue(const QByteArrayList &resourceIdentifier); |
60 | KAsync::Job<void> SINK_EXPORT flushReplayQueue(const QByteArray &resourceIdentifier); | 61 | KAsync::Job<void> SINK_EXPORT flushReplayQueue(const QByteArray &resourceIdentifier); |
62 | |||
63 | KAsync::Job<void> SINK_EXPORT flush(Flush::FlushType, const QByteArray &resourceIdentifier); | ||
64 | |||
61 | } | 65 | } |
62 | } | 66 | } |
diff --git a/common/store.cpp b/common/store.cpp index 6aae00f..8b8de1f 100644 --- a/common/store.cpp +++ b/common/store.cpp | |||
@@ -255,23 +255,7 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) | |||
255 | 255 | ||
256 | KAsync::Job<void> Store::synchronize(const Sink::Query &query) | 256 | KAsync::Job<void> Store::synchronize(const Sink::Query &query) |
257 | { | 257 | { |
258 | auto resources = getResources(query.getResourceFilter()).keys(); | 258 | return synchronize(Sink::SyncScope{static_cast<Sink::QueryBase>(query)}); |
259 | SinkTrace() << "synchronize" << resources; | ||
260 | return KAsync::value(resources) | ||
261 | .template each([query](const QByteArray &resource) { | ||
262 | SinkTrace() << "Synchronizing " << resource; | ||
263 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); | ||
264 | return resourceAccess->synchronizeResource(true, false) | ||
265 | .addToContext(resourceAccess) | ||
266 | .then<void>([](const KAsync::Error &error) { | ||
267 | if (error) { | ||
268 | SinkWarning() << "Error during sync."; | ||
269 | return KAsync::error<void>(error); | ||
270 | } | ||
271 | SinkTrace() << "synced."; | ||
272 | return KAsync::null<void>(); | ||
273 | }); | ||
274 | }); | ||
275 | } | 259 | } |
276 | 260 | ||
277 | KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope) | 261 | KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope) |
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 5bde597..f7dd816 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -27,6 +27,8 @@ | |||
27 | #include "createentity_generated.h" | 27 | #include "createentity_generated.h" |
28 | #include "modifyentity_generated.h" | 28 | #include "modifyentity_generated.h" |
29 | #include "deleteentity_generated.h" | 29 | #include "deleteentity_generated.h" |
30 | #include "flush_generated.h" | ||
31 | #include "notification_generated.h" | ||
30 | 32 | ||
31 | SINK_DEBUG_AREA("synchronizer") | 33 | SINK_DEBUG_AREA("synchronizer") |
32 | 34 | ||
@@ -263,6 +265,13 @@ KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) | |||
263 | return processSyncQueue(); | 265 | return processSyncQueue(); |
264 | } | 266 | } |
265 | 267 | ||
268 | void Synchronizer::flush(int commandId, const QByteArray &flushId) | ||
269 | { | ||
270 | SinkTrace() << "Flushing the synchronization queue"; | ||
271 | mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; | ||
272 | processSyncQueue().exec(); | ||
273 | } | ||
274 | |||
266 | KAsync::Job<void> Synchronizer::processSyncQueue() | 275 | KAsync::Job<void> Synchronizer::processSyncQueue() |
267 | { | 276 | { |
268 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { | 277 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { |
@@ -279,6 +288,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
279 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 288 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
280 | commit(); | 289 | commit(); |
281 | }); | 290 | }); |
291 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { | ||
292 | if (request.flushType == Flush::FlushReplayQueue) { | ||
293 | SinkTrace() << "Emitting flush completion."; | ||
294 | Sink::Notification n; | ||
295 | n.type = Sink::Notification::FlushCompletion; | ||
296 | n.id = request.flushId; | ||
297 | emit notify(n); | ||
298 | } else { | ||
299 | flatbuffers::FlatBufferBuilder fbb; | ||
300 | auto flushId = fbb.CreateString(request.flushId); | ||
301 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | ||
302 | Sink::Commands::FinishFlushBuffer(fbb, location); | ||
303 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | ||
304 | } | ||
282 | } else { | 305 | } else { |
283 | job = replayNextRevision(); | 306 | job = replayNextRevision(); |
284 | } | 307 | } |
diff --git a/common/synchronizer.h b/common/synchronizer.h index 4d5bdd5..99d4877 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h | |||
@@ -45,6 +45,7 @@ public: | |||
45 | 45 | ||
46 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue); | 46 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue); |
47 | KAsync::Job<void> synchronize(const Sink::QueryBase &query); | 47 | KAsync::Job<void> synchronize(const Sink::QueryBase &query); |
48 | void flush(int commandId, const QByteArray &flushId); | ||
48 | 49 | ||
49 | //Read only access to main storage | 50 | //Read only access to main storage |
50 | Storage::EntityStore &store(); | 51 | Storage::EntityStore &store(); |
@@ -57,6 +58,9 @@ public: | |||
57 | 58 | ||
58 | bool allChangesReplayed() Q_DECL_OVERRIDE; | 59 | bool allChangesReplayed() Q_DECL_OVERRIDE; |
59 | 60 | ||
61 | signals: | ||
62 | void notify(Notification); | ||
63 | |||
60 | public slots: | 64 | public slots: |
61 | virtual void revisionChanged() Q_DECL_OVERRIDE; | 65 | virtual void revisionChanged() Q_DECL_OVERRIDE; |
62 | 66 | ||
@@ -115,23 +119,30 @@ protected: | |||
115 | struct SyncRequest { | 119 | struct SyncRequest { |
116 | enum RequestType { | 120 | enum RequestType { |
117 | Synchronization, | 121 | Synchronization, |
118 | ChangeReplay | 122 | ChangeReplay, |
123 | Flush | ||
119 | }; | 124 | }; |
120 | 125 | ||
121 | SyncRequest(const Sink::QueryBase &q) | 126 | SyncRequest(const Sink::QueryBase &q) |
122 | : flushQueue(false), | 127 | : requestType(Synchronization), |
123 | requestType(Synchronization), | ||
124 | query(q) | 128 | query(q) |
125 | { | 129 | { |
126 | } | 130 | } |
127 | 131 | ||
128 | SyncRequest(RequestType type) | 132 | SyncRequest(RequestType type) |
129 | : flushQueue(false), | 133 | : requestType(type) |
134 | { | ||
135 | } | ||
136 | |||
137 | SyncRequest(RequestType type, int flushType_, const QByteArray &flushId_) | ||
138 | : flushType(flushType_), | ||
139 | flushId(flushId_), | ||
130 | requestType(type) | 140 | requestType(type) |
131 | { | 141 | { |
132 | } | 142 | } |
133 | 143 | ||
134 | bool flushQueue; | 144 | int flushType = 0; |
145 | QByteArray flushId; | ||
135 | RequestType requestType; | 146 | RequestType requestType; |
136 | Sink::QueryBase query; | 147 | Sink::QueryBase query; |
137 | }; | 148 | }; |
diff --git a/tests/testimplementations.h b/tests/testimplementations.h index 111c884..6fe08f7 100644 --- a/tests/testimplementations.h +++ b/tests/testimplementations.h | |||
@@ -66,10 +66,6 @@ public: | |||
66 | { | 66 | { |
67 | return KAsync::null<void>(); | 67 | return KAsync::null<void>(); |
68 | } | 68 | } |
69 | KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) Q_DECL_OVERRIDE | ||
70 | { | ||
71 | return KAsync::null<void>(); | ||
72 | } | ||
73 | KAsync::Job<void> synchronizeResource(const Sink::QueryBase &) Q_DECL_OVERRIDE | 69 | KAsync::Job<void> synchronizeResource(const Sink::QueryBase &) Q_DECL_OVERRIDE |
74 | { | 70 | { |
75 | return KAsync::null<void>(); | 71 | return KAsync::null<void>(); |