summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt2
-rw-r--r--common/commands.cpp2
-rw-r--r--common/commands.h1
-rw-r--r--common/commands/flush.fbs8
-rw-r--r--common/commands/synchronize.fbs2
-rw-r--r--common/flush.h45
-rw-r--r--common/genericresource.cpp57
-rw-r--r--common/genericresource.h1
-rw-r--r--common/listener.cpp19
-rw-r--r--common/notification.h10
-rw-r--r--common/query.h6
-rw-r--r--common/queuedcommand.fbs3
-rw-r--r--common/resourceaccess.cpp25
-rw-r--r--common/resourceaccess.h9
-rw-r--r--common/resourcecontrol.cpp34
-rw-r--r--common/resourcecontrol.h4
-rw-r--r--common/store.cpp18
-rw-r--r--common/synchronizer.cpp23
-rw-r--r--common/synchronizer.h21
19 files changed, 227 insertions, 63 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 5ba524b..8a16af4 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -95,6 +95,7 @@ generate_flatbuffers(
95 commands/notification 95 commands/notification
96 commands/revisionreplayed 96 commands/revisionreplayed
97 commands/inspection 97 commands/inspection
98 commands/flush
98 domain/event 99 domain/event
99 domain/mail 100 domain/mail
100 domain/folder 101 domain/folder
@@ -130,6 +131,7 @@ install(FILES
130 bufferadaptor.h 131 bufferadaptor.h
131 test.h 132 test.h
132 log.h 133 log.h
134 flush.h
133 ${CMAKE_CURRENT_BINARY_DIR}/sink_export.h 135 ${CMAKE_CURRENT_BINARY_DIR}/sink_export.h
134 DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel 136 DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel
135) 137)
diff --git a/common/commands.cpp b/common/commands.cpp
index 91657b8..c0781f6 100644
--- a/common/commands.cpp
+++ b/common/commands.cpp
@@ -61,6 +61,8 @@ QByteArray name(int commandId)
61 return "Inspection"; 61 return "Inspection";
62 case RemoveFromDiskCommand: 62 case RemoveFromDiskCommand:
63 return "RemoveFromDisk"; 63 return "RemoveFromDisk";
64 case FlushCommand:
65 return "Flush";
64 case CustomCommand: 66 case CustomCommand:
65 return "Custom"; 67 return "Custom";
66 }; 68 };
diff --git a/common/commands.h b/common/commands.h
index b97bbc6..0da1b3c 100644
--- a/common/commands.h
+++ b/common/commands.h
@@ -48,6 +48,7 @@ enum CommandIds
48 RevisionReplayedCommand, 48 RevisionReplayedCommand,
49 InspectionCommand, 49 InspectionCommand,
50 RemoveFromDiskCommand, 50 RemoveFromDiskCommand,
51 FlushCommand,
51 CustomCommand = 0xffff 52 CustomCommand = 0xffff
52}; 53};
53 54
diff --git a/common/commands/flush.fbs b/common/commands/flush.fbs
new file mode 100644
index 0000000..179f760
--- /dev/null
+++ b/common/commands/flush.fbs
@@ -0,0 +1,8 @@
1namespace Sink.Commands;
2
3table Flush {
4 id: string;
5 type: int; //See flush.h
6}
7
8root_type Flush;
diff --git a/common/commands/synchronize.fbs b/common/commands/synchronize.fbs
index 62f4b2b..7b32305 100644
--- a/common/commands/synchronize.fbs
+++ b/common/commands/synchronize.fbs
@@ -1,8 +1,6 @@
1namespace Sink.Commands; 1namespace Sink.Commands;
2 2
3table Synchronize { 3table Synchronize {
4 sourceSync: bool; //Synchronize with source
5 localSync: bool; //Ensure all queues are processed so the local state is up-to date.
6 query: string; 4 query: string;
7} 5}
8 6
diff --git a/common/flush.h b/common/flush.h
new file mode 100644
index 0000000..3f04608
--- /dev/null
+++ b/common/flush.h
@@ -0,0 +1,45 @@
1/*
2 * Copyright (c) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include "sink_export.h"
23
24namespace Sink {
25namespace Flush {
26
27enum FlushType {
28 /**
29 * Guarantees that any commands issued before this flush are written back to the server once this flush completes.
30 * Note that this does not guarantee the success of writeback, only that an attempt has been made.
31 */
32 FlushReplayQueue,
33 /**
34 * Guarantees that any synchronization request issued before this flush has been executed and that all entities created by it have been processed once this flush completes.
35 */
36 FlushSynchronization,
37 /**
38 * Guarantees that any modification issued before this flush has been processed once this flush completes.
39 */
40 FlushUserQueue
41};
42
43}
44}
45
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 7c4d4ea..7b83957 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -27,6 +27,7 @@
27#include "deleteentity_generated.h" 27#include "deleteentity_generated.h"
28#include "inspection_generated.h" 28#include "inspection_generated.h"
29#include "notification_generated.h" 29#include "notification_generated.h"
30#include "flush_generated.h"
30#include "domainadaptor.h" 31#include "domainadaptor.h"
31#include "commands.h" 32#include "commands.h"
32#include "index.h" 33#include "index.h"
@@ -54,6 +55,7 @@ class CommandProcessor : public QObject
54{ 55{
55 Q_OBJECT 56 Q_OBJECT
56 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; 57 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
58 typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction;
57 SINK_DEBUG_AREA("commandprocessor") 59 SINK_DEBUG_AREA("commandprocessor")
58 60
59public: 61public:
@@ -75,6 +77,11 @@ public:
75 mInspect = f; 77 mInspect = f;
76 } 78 }
77 79
80 void setFlushCommand(const FlushFunction &f)
81 {
82 mFlush = f;
83 }
84
78signals: 85signals:
79 void error(int errorCode, const QString &errorMessage); 86 void error(int errorCode, const QString &errorMessage);
80 87
@@ -124,6 +131,13 @@ private slots:
124 } else { 131 } else {
125 return KAsync::error<qint64>(-1, "Missing inspection command."); 132 return KAsync::error<qint64>(-1, "Missing inspection command.");
126 } 133 }
134 case Sink::Commands::FlushCommand:
135 if (mFlush) {
136 return mFlush(queuedCommand->command()->Data(), queuedCommand->command()->size())
137 .syncThen<qint64>([]() { return -1; });
138 } else {
139 return KAsync::error<qint64>(-1, "Missing inspection command.");
140 }
127 default: 141 default:
128 return KAsync::error<qint64>(-1, "Unhandled command"); 142 return KAsync::error<qint64>(-1, "Unhandled command");
129 } 143 }
@@ -219,6 +233,7 @@ private:
219 // The lowest revision we no longer need 233 // The lowest revision we no longer need
220 qint64 mLowerBoundRevision; 234 qint64 mLowerBoundRevision;
221 InspectionFunction mInspect; 235 InspectionFunction mInspect;
236 FlushFunction mFlush;
222}; 237};
223 238
224GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) 239GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline )
@@ -266,6 +281,26 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q
266 } 281 }
267 return KAsync::error<void>(-1, "Invalid inspection command."); 282 return KAsync::error<void>(-1, "Invalid inspection command.");
268 }); 283 });
284 mProcessor->setFlushCommand([this](void const *command, size_t size) {
285 flatbuffers::Verifier verifier((const uint8_t *)command, size);
286 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
287 auto buffer = Sink::Commands::GetFlush(command);
288 const auto flushType = buffer->type();
289 const auto flushId = BufferUtils::extractBuffer(buffer->id());
290 if (flushType == Sink::Flush::FlushReplayQueue) {
291 SinkTrace() << "Flushing synchronizer ";
292 mSynchronizer->flush(flushType, flushId);
293 } else {
294 SinkTrace() << "Emitting flush completion" << flushId;
295 Sink::Notification n;
296 n.type = Sink::Notification::FlushCompletion;
297 n.id = flushId;
298 emit notify(n);
299 }
300 return KAsync::null<void>();
301 }
302 return KAsync::error<void>(-1, "Invalid flush command.");
303 });
269 { 304 {
270 auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 305 auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
271 Q_ASSERT(ret); 306 Q_ASSERT(ret);
@@ -371,6 +406,10 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt
371 406
372void GenericResource::processCommand(int commandId, const QByteArray &data) 407void GenericResource::processCommand(int commandId, const QByteArray &data)
373{ 408{
409 if (commandId == Commands::FlushCommand) {
410 processFlushCommand(data);
411 return;
412 }
374 static int modifications = 0; 413 static int modifications = 0;
375 mUserQueue.startTransaction(); 414 mUserQueue.startTransaction();
376 enqueueCommand(mUserQueue, commandId, data); 415 enqueueCommand(mUserQueue, commandId, data);
@@ -384,6 +423,24 @@ void GenericResource::processCommand(int commandId, const QByteArray &data)
384 } 423 }
385} 424}
386 425
426void GenericResource::processFlushCommand(const QByteArray &data)
427{
428 flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size());
429 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
430 auto buffer = Sink::Commands::GetFlush(data.constData());
431 const auto flushType = buffer->type();
432 const auto flushId = BufferUtils::extractBuffer(buffer->id());
433 if (flushType == Sink::Flush::FlushSynchronization) {
434 mSynchronizer->flush(flushType, flushId);
435 } else {
436 mUserQueue.startTransaction();
437 enqueueCommand(mUserQueue, Commands::FlushCommand, data);
438 mUserQueue.commit();
439 }
440 }
441
442}
443
387KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) 444KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query)
388{ 445{
389 return KAsync::start<void>([this, query] { 446 return KAsync::start<void>([this, query] {
diff --git a/common/genericresource.h b/common/genericresource.h
index 3f92e93..9447c8b 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -47,6 +47,7 @@ public:
47 virtual ~GenericResource(); 47 virtual ~GenericResource();
48 48
49 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; 49 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE;
50 virtual void processFlushCommand(const QByteArray &data);
50 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; 51 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE;
51 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; 52 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
52 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; 53 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE;
diff --git a/common/listener.cpp b/common/listener.cpp
index c3c6bc2..2ab0333 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -33,7 +33,6 @@
33#include "common/synchronize_generated.h" 33#include "common/synchronize_generated.h"
34#include "common/notification_generated.h" 34#include "common/notification_generated.h"
35#include "common/revisionreplayed_generated.h" 35#include "common/revisionreplayed_generated.h"
36#include "common/inspection_generated.h"
37 36
38#include <QLocalServer> 37#include <QLocalServer>
39#include <QLocalSocket> 38#include <QLocalSocket>
@@ -244,18 +243,13 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
244 auto timer = QSharedPointer<QTime>::create(); 243 auto timer = QSharedPointer<QTime>::create();
245 timer->start(); 244 timer->start();
246 auto job = KAsync::null<void>(); 245 auto job = KAsync::null<void>();
247 if (buffer->sourceSync()) { 246 Sink::QueryBase query;
248 Sink::QueryBase query; 247 if (buffer->query()) {
249 if (buffer->query()) { 248 auto data = QByteArray::fromStdString(buffer->query()->str());
250 auto data = QByteArray::fromStdString(buffer->query()->str()); 249 QDataStream stream(&data, QIODevice::ReadOnly);
251 QDataStream stream(&data, QIODevice::ReadOnly); 250 stream >> query;
252 stream >> query;
253 }
254 job = loadResource().synchronizeWithSource(query);
255 }
256 if (buffer->localSync()) {
257 job = job.then<void>(loadResource().processAllMessages());
258 } 251 }
252 job = loadResource().synchronizeWithSource(query);
259 job.then<void>([callback, timer](const KAsync::Error &error) { 253 job.then<void>([callback, timer](const KAsync::Error &error) {
260 if (error) { 254 if (error) {
261 SinkWarning() << "Sync failed: " << error.errorMessage; 255 SinkWarning() << "Sync failed: " << error.errorMessage;
@@ -279,6 +273,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
279 case Sink::Commands::DeleteEntityCommand: 273 case Sink::Commands::DeleteEntityCommand:
280 case Sink::Commands::ModifyEntityCommand: 274 case Sink::Commands::ModifyEntityCommand:
281 case Sink::Commands::CreateEntityCommand: 275 case Sink::Commands::CreateEntityCommand:
276 case Sink::Commands::FlushCommand:
282 SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; 277 SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name;
283 loadResource().processCommand(commandId, commandBuffer); 278 loadResource().processCommand(commandId, commandBuffer);
284 break; 279 break;
diff --git a/common/notification.h b/common/notification.h
index dcf00a3..b1bd290 100644
--- a/common/notification.h
+++ b/common/notification.h
@@ -37,17 +37,19 @@ public:
37 Warning, 37 Warning,
38 Progress, 38 Progress,
39 Inspection, 39 Inspection,
40 RevisionUpdate 40 RevisionUpdate,
41 FlushCompletion
41 }; 42 };
42 enum InspectionCode { 43 enum InspectionCode {
43 Success, 44 Success = 0,
44 Failure 45 Failure
45 }; 46 };
46 47
47 QByteArray id; 48 QByteArray id;
48 int type; 49 int type = 0;
49 QString message; 50 QString message;
50 int code; 51 //A return code. Zero typically indicates success.
52 int code = 0;
51}; 53};
52} 54}
53 55
diff --git a/common/query.h b/common/query.h
index 0bc5141..2adb7e9 100644
--- a/common/query.h
+++ b/common/query.h
@@ -465,6 +465,12 @@ class SyncScope : public QueryBase {
465public: 465public:
466 using QueryBase::QueryBase; 466 using QueryBase::QueryBase;
467 467
468 SyncScope(const QueryBase &other)
469 : QueryBase(other)
470 {
471
472 }
473
468 Query::Filter getResourceFilter() const 474 Query::Filter getResourceFilter() const
469 { 475 {
470 return mResourceFilter; 476 return mResourceFilter;
diff --git a/common/queuedcommand.fbs b/common/queuedcommand.fbs
index 06226d3..114e2cd 100644
--- a/common/queuedcommand.fbs
+++ b/common/queuedcommand.fbs
@@ -3,9 +3,6 @@ namespace Sink;
3table QueuedCommand { 3table QueuedCommand {
4 commandId: int; 4 commandId: int;
5 command: [ubyte]; 5 command: [ubyte];
6 // entityId: string;
7 // sourceRevision: ulong;
8 // targetRevision: [ubyte];
9} 6}
10 7
11root_type QueuedCommand; 8root_type QueuedCommand;
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 822b5cd..b46e8b2 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -31,6 +31,7 @@
31#include "common/deleteentity_generated.h" 31#include "common/deleteentity_generated.h"
32#include "common/revisionreplayed_generated.h" 32#include "common/revisionreplayed_generated.h"
33#include "common/inspection_generated.h" 33#include "common/inspection_generated.h"
34#include "common/flush_generated.h"
34#include "common/entitybuffer.h" 35#include "common/entitybuffer.h"
35#include "common/bufferutils.h" 36#include "common/bufferutils.h"
36#include "common/test.h" 37#include "common/test.h"
@@ -291,16 +292,6 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu
291 }); 292 });
292} 293}
293 294
294KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync)
295{
296 SinkTrace() << "Sending synchronize command: " << sourceSync << localSync;
297 flatbuffers::FlatBufferBuilder fbb;
298 auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync);
299 Sink::Commands::FinishSynchronizeBuffer(fbb, command);
300 open();
301 return sendCommand(Commands::SynchronizeCommand, fbb);
302}
303
304KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &query) 295KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &query)
305{ 296{
306 flatbuffers::FlatBufferBuilder fbb; 297 flatbuffers::FlatBufferBuilder fbb;
@@ -311,8 +302,6 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &que
311 } 302 }
312 auto q = fbb.CreateString(queryString.toStdString()); 303 auto q = fbb.CreateString(queryString.toStdString());
313 auto builder = Sink::Commands::SynchronizeBuilder(fbb); 304 auto builder = Sink::Commands::SynchronizeBuilder(fbb);
314 builder.add_sourceSync(true);
315 builder.add_localSync(false);
316 builder.add_query(q); 305 builder.add_query(q);
317 Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); 306 Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish());
318 307
@@ -390,6 +379,16 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp
390 return sendCommand(Sink::Commands::InspectionCommand, fbb); 379 return sendCommand(Sink::Commands::InspectionCommand, fbb);
391} 380}
392 381
382KAsync::Job<void> ResourceAccess::sendFlushCommand(int flushType, const QByteArray &flushId)
383{
384 flatbuffers::FlatBufferBuilder fbb;
385 auto id = fbb.CreateString(flushId.toStdString());
386 auto location = Sink::Commands::CreateFlush(fbb, id, flushType);
387 Sink::Commands::FinishFlushBuffer(fbb, location);
388 open();
389 return sendCommand(Sink::Commands::FlushCommand, fbb);
390}
391
393void ResourceAccess::open() 392void ResourceAccess::open()
394{ 393{
395 if (d->socket && d->socket->isValid()) { 394 if (d->socket && d->socket->isValid()) {
@@ -613,6 +612,8 @@ bool ResourceAccess::processMessageBuffer()
613 [[clang::fallthrough]]; 612 [[clang::fallthrough]];
614 case Sink::Notification::Warning: 613 case Sink::Notification::Warning:
615 [[clang::fallthrough]]; 614 [[clang::fallthrough]];
615 case Sink::Notification::FlushCompletion:
616 [[clang::fallthrough]];
616 case Sink::Notification::Progress: { 617 case Sink::Notification::Progress: {
617 auto n = getNotification(buffer); 618 auto n = getNotification(buffer);
618 SinkTrace() << "Received notification: " << n.type; 619 SinkTrace() << "Received notification: " << n.type;
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 755c8a7..4229161 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -29,6 +29,7 @@
29 29
30#include <flatbuffers/flatbuffers.h> 30#include <flatbuffers/flatbuffers.h>
31#include "notification.h" 31#include "notification.h"
32#include "flush.h"
32#include "query.h" 33#include "query.h"
33#include "log.h" 34#include "log.h"
34 35
@@ -50,7 +51,6 @@ public:
50 } 51 }
51 virtual KAsync::Job<void> sendCommand(int commandId) = 0; 52 virtual KAsync::Job<void> sendCommand(int commandId) = 0;
52 virtual KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) = 0; 53 virtual KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) = 0;
53 virtual KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) = 0;
54 virtual KAsync::Job<void> synchronizeResource(const Sink::QueryBase &filter) = 0; 54 virtual KAsync::Job<void> synchronizeResource(const Sink::QueryBase &filter) = 0;
55 55
56 virtual KAsync::Job<void> sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) 56 virtual KAsync::Job<void> sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer)
@@ -75,6 +75,11 @@ public:
75 return KAsync::null<void>(); 75 return KAsync::null<void>();
76 }; 76 };
77 77
78 virtual KAsync::Job<void> sendFlushCommand(int flushType, const QByteArray &flushId)
79 {
80 return KAsync::null<void>();
81 }
82
78 int getResourceStatus() const 83 int getResourceStatus() const
79 { 84 {
80 return mResourceStatus; 85 return mResourceStatus;
@@ -108,7 +113,6 @@ public:
108 113
109 KAsync::Job<void> sendCommand(int commandId) Q_DECL_OVERRIDE; 114 KAsync::Job<void> sendCommand(int commandId) Q_DECL_OVERRIDE;
110 KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) Q_DECL_OVERRIDE; 115 KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) Q_DECL_OVERRIDE;
111 KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) Q_DECL_OVERRIDE;
112 KAsync::Job<void> synchronizeResource(const Sink::QueryBase &filter) Q_DECL_OVERRIDE; 116 KAsync::Job<void> synchronizeResource(const Sink::QueryBase &filter) Q_DECL_OVERRIDE;
113 KAsync::Job<void> sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) Q_DECL_OVERRIDE; 117 KAsync::Job<void> sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) Q_DECL_OVERRIDE;
114 KAsync::Job<void> 118 KAsync::Job<void>
@@ -117,6 +121,7 @@ public:
117 KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE; 121 KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE;
118 KAsync::Job<void> 122 KAsync::Job<void>
119 sendInspectionCommand(int inspectionType,const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expecedValue) Q_DECL_OVERRIDE; 123 sendInspectionCommand(int inspectionType,const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expecedValue) Q_DECL_OVERRIDE;
124 KAsync::Job<void> sendFlushCommand(int flushType, const QByteArray &flushId) Q_DECL_OVERRIDE;
120 /** 125 /**
121 * Tries to connect to server, and returns a connected socket on success. 126 * Tries to connect to server, and returns a connected socket on success.
122 */ 127 */
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp
index 3568844..af98b8b 100644
--- a/common/resourcecontrol.cpp
+++ b/common/resourcecontrol.cpp
@@ -85,17 +85,39 @@ KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resou
85 SinkTrace() << "flushMessageQueue" << resourceIdentifier; 85 SinkTrace() << "flushMessageQueue" << resourceIdentifier;
86 return KAsync::value(resourceIdentifier) 86 return KAsync::value(resourceIdentifier)
87 .template each([](const QByteArray &resource) { 87 .template each([](const QByteArray &resource) {
88 SinkTrace() << "Flushing message queue " << resource; 88 return flushMessageQueue(resource);
89 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource));
90 resourceAccess->open();
91 return resourceAccess->synchronizeResource(false, true)
92 .addToContext(resourceAccess);
93 }); 89 });
94} 90}
95 91
96KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier) 92KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier)
97{ 93{
98 return flushMessageQueue(QByteArrayList() << resourceIdentifier); 94 return flush(Flush::FlushUserQueue, resourceIdentifier).then(flush(Flush::FlushSynchronization, resourceIdentifier));
95}
96
97KAsync::Job<void> ResourceControl::flush(Flush::FlushType type, const QByteArray &resourceIdentifier)
98{
99 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier, ResourceConfig::getResourceType(resourceIdentifier));
100 auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess);
101 auto id = QUuid::createUuid().toByteArray();
102 return KAsync::start<void>([=](KAsync::Future<void> &future) {
103 SinkTrace() << "Waiting for notification notification " << id;
104 notifier->registerHandler([&future, id](const Notification &notification) {
105 SinkTrace() << "Received notification " << notification.type << notification.id;
106 if (notification.id == id) {
107 SinkTrace() << "FlushComplete";
108 if (notification.code) {
109 SinkWarning() << "Flush return an error";
110 future.setError(-1, "Flush returned an error: " + notification.message);
111 } else {
112 future.setFinished();
113 }
114 }
115 });
116 resourceAccess->sendFlushCommand(type, id).onError([&future] (const KAsync::Error &error) {
117 SinkWarning() << "Failed to send command";
118 future.setError(1, "Failed to send command: " + error.errorMessage);
119 }).exec();
120 });
99} 121}
100 122
101KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier) 123KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier)
diff --git a/common/resourcecontrol.h b/common/resourcecontrol.h
index 9e603e4..b910441 100644
--- a/common/resourcecontrol.h
+++ b/common/resourcecontrol.h
@@ -26,6 +26,7 @@
26#include <Async/Async> 26#include <Async/Async>
27 27
28#include "inspection.h" 28#include "inspection.h"
29#include "flush.h"
29 30
30namespace Sink { 31namespace Sink {
31namespace ResourceControl { 32namespace ResourceControl {
@@ -58,5 +59,8 @@ KAsync::Job<void> SINK_EXPORT flushMessageQueue(const QByteArray &resourceIdenti
58 */ 59 */
59KAsync::Job<void> SINK_EXPORT flushReplayQueue(const QByteArrayList &resourceIdentifier); 60KAsync::Job<void> SINK_EXPORT flushReplayQueue(const QByteArrayList &resourceIdentifier);
60KAsync::Job<void> SINK_EXPORT flushReplayQueue(const QByteArray &resourceIdentifier); 61KAsync::Job<void> SINK_EXPORT flushReplayQueue(const QByteArray &resourceIdentifier);
62
63KAsync::Job<void> SINK_EXPORT flush(Flush::FlushType, const QByteArray &resourceIdentifier);
64
61} 65}
62} 66}
diff --git a/common/store.cpp b/common/store.cpp
index 6aae00f..8b8de1f 100644
--- a/common/store.cpp
+++ b/common/store.cpp
@@ -255,23 +255,7 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
255 255
256KAsync::Job<void> Store::synchronize(const Sink::Query &query) 256KAsync::Job<void> Store::synchronize(const Sink::Query &query)
257{ 257{
258 auto resources = getResources(query.getResourceFilter()).keys(); 258 return synchronize(Sink::SyncScope{static_cast<Sink::QueryBase>(query)});
259 SinkTrace() << "synchronize" << resources;
260 return KAsync::value(resources)
261 .template each([query](const QByteArray &resource) {
262 SinkTrace() << "Synchronizing " << resource;
263 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource));
264 return resourceAccess->synchronizeResource(true, false)
265 .addToContext(resourceAccess)
266 .then<void>([](const KAsync::Error &error) {
267 if (error) {
268 SinkWarning() << "Error during sync.";
269 return KAsync::error<void>(error);
270 }
271 SinkTrace() << "synced.";
272 return KAsync::null<void>();
273 });
274 });
275} 259}
276 260
277KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope) 261KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope)
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 5bde597..f7dd816 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -27,6 +27,8 @@
27#include "createentity_generated.h" 27#include "createentity_generated.h"
28#include "modifyentity_generated.h" 28#include "modifyentity_generated.h"
29#include "deleteentity_generated.h" 29#include "deleteentity_generated.h"
30#include "flush_generated.h"
31#include "notification_generated.h"
30 32
31SINK_DEBUG_AREA("synchronizer") 33SINK_DEBUG_AREA("synchronizer")
32 34
@@ -263,6 +265,13 @@ KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query)
263 return processSyncQueue(); 265 return processSyncQueue();
264} 266}
265 267
268void Synchronizer::flush(int commandId, const QByteArray &flushId)
269{
270 SinkTrace() << "Flushing the synchronization queue";
271 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId};
272 processSyncQueue().exec();
273}
274
266KAsync::Job<void> Synchronizer::processSyncQueue() 275KAsync::Job<void> Synchronizer::processSyncQueue()
267{ 276{
268 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { 277 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
@@ -279,6 +288,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
279 //Commit after every request, so implementations only have to commit more if they add a lot of data. 288 //Commit after every request, so implementations only have to commit more if they add a lot of data.
280 commit(); 289 commit();
281 }); 290 });
291 } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
292 if (request.flushType == Flush::FlushReplayQueue) {
293 SinkTrace() << "Emitting flush completion.";
294 Sink::Notification n;
295 n.type = Sink::Notification::FlushCompletion;
296 n.id = request.flushId;
297 emit notify(n);
298 } else {
299 flatbuffers::FlatBufferBuilder fbb;
300 auto flushId = fbb.CreateString(request.flushId);
301 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
302 Sink::Commands::FinishFlushBuffer(fbb, location);
303 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
304 }
282 } else { 305 } else {
283 job = replayNextRevision(); 306 job = replayNextRevision();
284 } 307 }
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 4d5bdd5..99d4877 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -45,6 +45,7 @@ public:
45 45
46 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue); 46 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue);
47 KAsync::Job<void> synchronize(const Sink::QueryBase &query); 47 KAsync::Job<void> synchronize(const Sink::QueryBase &query);
48 void flush(int commandId, const QByteArray &flushId);
48 49
49 //Read only access to main storage 50 //Read only access to main storage
50 Storage::EntityStore &store(); 51 Storage::EntityStore &store();
@@ -57,6 +58,9 @@ public:
57 58
58 bool allChangesReplayed() Q_DECL_OVERRIDE; 59 bool allChangesReplayed() Q_DECL_OVERRIDE;
59 60
61signals:
62 void notify(Notification);
63
60public slots: 64public slots:
61 virtual void revisionChanged() Q_DECL_OVERRIDE; 65 virtual void revisionChanged() Q_DECL_OVERRIDE;
62 66
@@ -115,23 +119,30 @@ protected:
115 struct SyncRequest { 119 struct SyncRequest {
116 enum RequestType { 120 enum RequestType {
117 Synchronization, 121 Synchronization,
118 ChangeReplay 122 ChangeReplay,
123 Flush
119 }; 124 };
120 125
121 SyncRequest(const Sink::QueryBase &q) 126 SyncRequest(const Sink::QueryBase &q)
122 : flushQueue(false), 127 : requestType(Synchronization),
123 requestType(Synchronization),
124 query(q) 128 query(q)
125 { 129 {
126 } 130 }
127 131
128 SyncRequest(RequestType type) 132 SyncRequest(RequestType type)
129 : flushQueue(false), 133 : requestType(type)
134 {
135 }
136
137 SyncRequest(RequestType type, int flushType_, const QByteArray &flushId_)
138 : flushType(flushType_),
139 flushId(flushId_),
130 requestType(type) 140 requestType(type)
131 { 141 {
132 } 142 }
133 143
134 bool flushQueue; 144 int flushType = 0;
145 QByteArray flushId;
135 RequestType requestType; 146 RequestType requestType;
136 Sink::QueryBase query; 147 Sink::QueryBase query;
137 }; 148 };