summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/commandprocessor.cpp210
-rw-r--r--common/commandprocessor.h19
-rw-r--r--common/genericresource.cpp167
-rw-r--r--common/genericresource.h13
-rw-r--r--common/listener.cpp40
-rw-r--r--common/resource.cpp10
-rw-r--r--common/resource.h10
-rw-r--r--common/synchronizer.h2
8 files changed, 241 insertions, 230 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index 57fe524..bdff905 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -19,25 +19,171 @@
19 */ 19 */
20#include "commandprocessor.h" 20#include "commandprocessor.h"
21 21
22#include <QDataStream>
23
22#include "commands.h" 24#include "commands.h"
23#include "messagequeue.h" 25#include "messagequeue.h"
24#include "queuedcommand_generated.h"
25#include "flush_generated.h" 26#include "flush_generated.h"
26#include "inspector.h" 27#include "inspector.h"
27#include "synchronizer.h" 28#include "synchronizer.h"
28#include "pipeline.h" 29#include "pipeline.h"
29#include "bufferutils.h" 30#include "bufferutils.h"
31#include "definitions.h"
32#include "storage.h"
33
34#include "queuedcommand_generated.h"
35#include "revisionreplayed_generated.h"
36#include "synchronize_generated.h"
30 37
31static int sBatchSize = 100; 38static int sBatchSize = 100;
39// This interval directly affects the roundtrip time of single commands
40static int sCommitInterval = 10;
41
32 42
33using namespace Sink; 43using namespace Sink;
44using namespace Sink::Storage;
34 45
35CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0) 46CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId)
47 : QObject(),
48 mPipeline(pipeline),
49 mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"),
50 mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"),
51 mCommandQueues(QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue), mProcessingLock(false), mLowerBoundRevision(0)
36{ 52{
37 for (auto queue : mCommandQueues) { 53 for (auto queue : mCommandQueues) {
38 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); 54 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process);
39 Q_UNUSED(ret); 55 Q_UNUSED(ret);
40 } 56 }
57
58 mCommitQueueTimer.setInterval(sCommitInterval);
59 mCommitQueueTimer.setSingleShot(true);
60 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);
61}
62
63static void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data)
64{
65 flatbuffers::FlatBufferBuilder fbb;
66 auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size());
67 auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData);
68 Sink::FinishQueuedCommandBuffer(fbb, buffer);
69 mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize());
70}
71
72void CommandProcessor::processCommand(int commandId, const QByteArray &data)
73{
74 switch (commandId) {
75 case Commands::FlushCommand:
76 processFlushCommand(data);
77 break;
78 case Commands::SynchronizeCommand:
79 processSynchronizeCommand(data);
80 break;
81 // case Commands::RevisionReplayedCommand:
82 // processRevisionReplayedCommand(data);
83 // break;
84 default: {
85 static int modifications = 0;
86 mUserQueue.startTransaction();
87 enqueueCommand(mUserQueue, commandId, data);
88 modifications++;
89 if (modifications >= sBatchSize) {
90 mUserQueue.commit();
91 modifications = 0;
92 mCommitQueueTimer.stop();
93 } else {
94 mCommitQueueTimer.start();
95 }
96 }
97 };
98}
99
100void CommandProcessor::processFlushCommand(const QByteArray &data)
101{
102 flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size());
103 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
104 auto buffer = Sink::Commands::GetFlush(data.constData());
105 const auto flushType = buffer->type();
106 const auto flushId = BufferUtils::extractBuffer(buffer->id());
107 if (flushType == Sink::Flush::FlushSynchronization) {
108 mSynchronizer->flush(flushType, flushId);
109 } else {
110 mUserQueue.startTransaction();
111 enqueueCommand(mUserQueue, Commands::FlushCommand, data);
112 mUserQueue.commit();
113 }
114 }
115
116}
117
118void CommandProcessor::processSynchronizeCommand(const QByteArray &data)
119{
120 flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size());
121 if (Sink::Commands::VerifySynchronizeBuffer(verifier)) {
122 auto buffer = Sink::Commands::GetSynchronize(data.constData());
123 auto timer = QSharedPointer<QTime>::create();
124 timer->start();
125 auto job = KAsync::null<void>();
126 Sink::QueryBase query;
127 if (buffer->query()) {
128 auto data = QByteArray::fromStdString(buffer->query()->str());
129 QDataStream stream(&data, QIODevice::ReadOnly);
130 stream >> query;
131 }
132 job = synchronizeWithSource(query);
133 job.then<void>([timer](const KAsync::Error &error) {
134 if (error) {
135 SinkWarning() << "Sync failed: " << error.errorMessage;
136 return KAsync::error(error);
137 } else {
138 SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
139 return KAsync::null();
140 }
141 })
142 .exec();
143 return;
144 } else {
145 SinkWarning() << "received invalid command";
146 }
147}
148
149// void CommandProcessor::processRevisionReplayedCommand(const QByteArray &data)
150// {
151// flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
152// if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) {
153// auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData());
154// client.currentRevision = buffer->revision();
155// } else {
156// SinkWarning() << "received invalid command";
157// }
158// loadResource().setLowerBoundRevision(lowerBoundRevision());
159// }
160
161KAsync::Job<void> CommandProcessor::synchronizeWithSource(const Sink::QueryBase &query)
162{
163 return KAsync::start<void>([this, query] {
164 Sink::Notification n;
165 n.id = "sync";
166 n.type = Sink::Notification::Status;
167 n.message = "Synchronization has started.";
168 n.code = Sink::ApplicationDomain::BusyStatus;
169 emit notify(n);
170
171 SinkLog() << " Synchronizing";
172 return mSynchronizer->synchronize(query)
173 .then<void>([this](const KAsync::Error &error) {
174 if (!error) {
175 SinkLog() << "Done Synchronizing";
176 Sink::Notification n;
177 n.id = "sync";
178 n.type = Sink::Notification::Status;
179 n.message = "Synchronization has ended.";
180 n.code = Sink::ApplicationDomain::ConnectedStatus;
181 emit notify(n);
182 return KAsync::null();
183 }
184 return KAsync::error(error);
185 });
186 });
41} 187}
42 188
43void CommandProcessor::setOldestUsedRevision(qint64 revision) 189void CommandProcessor::setOldestUsedRevision(qint64 revision)
@@ -186,6 +332,27 @@ void CommandProcessor::setInspector(const QSharedPointer<Inspector> &inspector)
186void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) 332void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer)
187{ 333{
188 mSynchronizer = synchronizer; 334 mSynchronizer = synchronizer;
335 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
336 enqueueCommand(mSynchronizerQueue, commandId, data);
337 }, mSynchronizerQueue);
338
339 QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() {
340 Sink::Notification n;
341 n.id = "changereplay";
342 n.type = Sink::Notification::Status;
343 n.message = "Replaying changes.";
344 n.code = Sink::ApplicationDomain::BusyStatus;
345 emit notify(n);
346 });
347 QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() {
348 Sink::Notification n;
349 n.id = "changereplay";
350 n.type = Sink::Notification::Status;
351 n.message = "All changes have been replayed.";
352 n.code = Sink::ApplicationDomain::ConnectedStatus;
353 emit notify(n);
354 });
355
189 QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); 356 QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify);
190 setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); 357 setOldestUsedRevision(mSynchronizer->getLastReplayedRevision());
191} 358}
@@ -213,3 +380,42 @@ KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size)
213 return KAsync::error<void>(-1, "Invalid flush command."); 380 return KAsync::error<void>(-1, "Invalid flush command.");
214} 381}
215 382
383static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue)
384{
385 if (queue.isEmpty()) {
386 f.setFinished();
387 } else {
388 QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); });
389 }
390};
391
392KAsync::Job<void> CommandProcessor::processAllMessages()
393{
394 // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
395 // TODO: report errors while processing sync?
396 // TODO JOBAPI: A helper that waits for n events and then continues?
397 return KAsync::start<void>([this](KAsync::Future<void> &f) {
398 if (mCommitQueueTimer.isActive()) {
399 auto context = new QObject;
400 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() {
401 delete context;
402 f.setFinished();
403 });
404 } else {
405 f.setFinished();
406 }
407 })
408 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); })
409 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); })
410 .then<void>([this](KAsync::Future<void> &f) {
411 if (mSynchronizer->allChangesReplayed()) {
412 f.setFinished();
413 } else {
414 auto context = new QObject;
415 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() {
416 delete context;
417 f.setFinished();
418 });
419 }
420 });
421}
diff --git a/common/commandprocessor.h b/common/commandprocessor.h
index d00cf43..a807f46 100644
--- a/common/commandprocessor.h
+++ b/common/commandprocessor.h
@@ -22,19 +22,20 @@
22#include "sink_export.h" 22#include "sink_export.h"
23 23
24#include <QObject> 24#include <QObject>
25#include <QTimer>
25#include <Async/Async> 26#include <Async/Async>
26#include <functional> 27#include <functional>
27 28
28#include "log.h" 29#include "log.h"
29#include "notification.h" 30#include "notification.h"
30 31#include "messagequeue.h"
31class MessageQueue;
32 32
33namespace Sink { 33namespace Sink {
34 class Pipeline; 34 class Pipeline;
35 class Inspector; 35 class Inspector;
36 class Synchronizer; 36 class Synchronizer;
37 class QueuedCommand; 37 class QueuedCommand;
38 class QueryBase;
38 39
39/** 40/**
40 * Drives the pipeline using the output from all command queues 41 * Drives the pipeline using the output from all command queues
@@ -45,13 +46,17 @@ class CommandProcessor : public QObject
45 SINK_DEBUG_AREA("commandprocessor") 46 SINK_DEBUG_AREA("commandprocessor")
46 47
47public: 48public:
48 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues); 49 CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId);
49 50
50 void setOldestUsedRevision(qint64 revision); 51 void setOldestUsedRevision(qint64 revision);
51 52
52 void setInspector(const QSharedPointer<Inspector> &inspector); 53 void setInspector(const QSharedPointer<Inspector> &inspector);
53 void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); 54 void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer);
54 55
56 void processCommand(int commandId, const QByteArray &data);
57
58 KAsync::Job<void> processAllMessages();
59
55signals: 60signals:
56 void notify(Notification); 61 void notify(Notification);
57 void error(int errorCode, const QString &errorMessage); 62 void error(int errorCode, const QString &errorMessage);
@@ -68,9 +73,16 @@ private slots:
68 KAsync::Job<void> processPipeline(); 73 KAsync::Job<void> processPipeline();
69 74
70private: 75private:
76 void processFlushCommand(const QByteArray &data);
77 void processSynchronizeCommand(const QByteArray &data);
78 // void processRevisionReplayedCommand(const QByteArray &data);
79
71 KAsync::Job<void> flush(void const *command, size_t size); 80 KAsync::Job<void> flush(void const *command, size_t size);
81 KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query);
72 82
73 Sink::Pipeline *mPipeline; 83 Sink::Pipeline *mPipeline;
84 MessageQueue mUserQueue;
85 MessageQueue mSynchronizerQueue;
74 // Ordered by priority 86 // Ordered by priority
75 QList<MessageQueue *> mCommandQueues; 87 QList<MessageQueue *> mCommandQueues;
76 bool mProcessingLock; 88 bool mProcessingLock;
@@ -78,6 +90,7 @@ private:
78 qint64 mLowerBoundRevision; 90 qint64 mLowerBoundRevision;
79 QSharedPointer<Synchronizer> mSynchronizer; 91 QSharedPointer<Synchronizer> mSynchronizer;
80 QSharedPointer<Inspector> mInspector; 92 QSharedPointer<Inspector> mInspector;
93 QTimer mCommitQueueTimer;
81}; 94};
82 95
83}; 96};
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 38da6bf..82112b3 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -19,32 +19,13 @@
19 */ 19 */
20#include "genericresource.h" 20#include "genericresource.h"
21 21
22#include "entitybuffer.h"
23#include "pipeline.h" 22#include "pipeline.h"
24#include "queuedcommand_generated.h"
25#include "createentity_generated.h"
26#include "modifyentity_generated.h"
27#include "deleteentity_generated.h"
28#include "inspection_generated.h"
29#include "notification_generated.h"
30#include "flush_generated.h"
31#include "domainadaptor.h" 23#include "domainadaptor.h"
32#include "commands.h"
33#include "index.h"
34#include "log.h" 24#include "log.h"
35#include "definitions.h"
36#include "bufferutils.h"
37#include "adaptorfactoryregistry.h"
38#include "synchronizer.h" 25#include "synchronizer.h"
39#include "commandprocessor.h" 26#include "commandprocessor.h"
40 27#include "definitions.h"
41#include <QUuid> 28#include "storage.h"
42#include <QDataStream>
43#include <QTime>
44
45static int sBatchSize = 100;
46// This interval directly affects the roundtrip time of single commands
47static int sCommitInterval = 10;
48 29
49using namespace Sink; 30using namespace Sink;
50using namespace Sink::Storage; 31using namespace Sink::Storage;
@@ -52,20 +33,14 @@ using namespace Sink::Storage;
52GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) 33GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline )
53 : Sink::Resource(), 34 : Sink::Resource(),
54 mResourceContext(resourceContext), 35 mResourceContext(resourceContext),
55 mUserQueue(Sink::storageLocation(), resourceContext.instanceId() + ".userqueue"),
56 mSynchronizerQueue(Sink::storageLocation(), resourceContext.instanceId() + ".synchronizerqueue"),
57 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)), 36 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)),
37 mProcessor(QSharedPointer<CommandProcessor>::create(mPipeline.data(), resourceContext.instanceId())),
58 mError(0), 38 mError(0),
59 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 39 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
60{ 40{
61 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); 41 QObject::connect(mProcessor.data(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
62 QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 42 QObject::connect(mProcessor.data(), &CommandProcessor::notify, this, &GenericResource::notify);
63 QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify);
64 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 43 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
65
66 mCommitQueueTimer.setInterval(sCommitInterval);
67 mCommitQueueTimer.setSingleShot(true);
68 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);
69} 44}
70 45
71GenericResource::~GenericResource() 46GenericResource::~GenericResource()
@@ -80,32 +55,6 @@ void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<S
80void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) 55void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer)
81{ 56{
82 mSynchronizer = synchronizer; 57 mSynchronizer = synchronizer;
83 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
84 enqueueCommand(mSynchronizerQueue, commandId, data);
85 }, mSynchronizerQueue);
86 {
87 auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() {
88 Sink::Notification n;
89 n.id = "changereplay";
90 n.type = Sink::Notification::Status;
91 n.message = "Replaying changes.";
92 n.code = Sink::ApplicationDomain::BusyStatus;
93 emit notify(n);
94 });
95 Q_ASSERT(ret);
96 }
97 {
98 auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() {
99 Sink::Notification n;
100 n.id = "changereplay";
101 n.type = Sink::Notification::Status;
102 n.message = "All changes have been replayed.";
103 n.code = Sink::ApplicationDomain::ConnectedStatus;
104 emit notify(n);
105 });
106 Q_ASSERT(ret);
107 }
108
109 mProcessor->setSynchronizer(synchronizer); 58 mProcessor->setSynchronizer(synchronizer);
110 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); 59 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
111 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 60 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
@@ -146,119 +95,19 @@ int GenericResource::error() const
146 return mError; 95 return mError;
147} 96}
148 97
149void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data)
150{
151 flatbuffers::FlatBufferBuilder fbb;
152 auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size());
153 auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData);
154 Sink::FinishQueuedCommandBuffer(fbb, buffer);
155 mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize());
156}
157
158void GenericResource::processCommand(int commandId, const QByteArray &data) 98void GenericResource::processCommand(int commandId, const QByteArray &data)
159{ 99{
160 if (commandId == Commands::FlushCommand) { 100 mProcessor->processCommand(commandId, data);
161 processFlushCommand(data);
162 return;
163 }
164 static int modifications = 0;
165 mUserQueue.startTransaction();
166 enqueueCommand(mUserQueue, commandId, data);
167 modifications++;
168 if (modifications >= sBatchSize) {
169 mUserQueue.commit();
170 modifications = 0;
171 mCommitQueueTimer.stop();
172 } else {
173 mCommitQueueTimer.start();
174 }
175}
176
177void GenericResource::processFlushCommand(const QByteArray &data)
178{
179 flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size());
180 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
181 auto buffer = Sink::Commands::GetFlush(data.constData());
182 const auto flushType = buffer->type();
183 const auto flushId = BufferUtils::extractBuffer(buffer->id());
184 if (flushType == Sink::Flush::FlushSynchronization) {
185 mSynchronizer->flush(flushType, flushId);
186 } else {
187 mUserQueue.startTransaction();
188 enqueueCommand(mUserQueue, Commands::FlushCommand, data);
189 mUserQueue.commit();
190 }
191 }
192
193} 101}
194 102
195KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) 103KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query)
196{ 104{
197 return KAsync::start<void>([this, query] { 105 return mSynchronizer->synchronize(query);
198
199 Sink::Notification n;
200 n.id = "sync";
201 n.type = Sink::Notification::Status;
202 n.message = "Synchronization has started.";
203 n.code = Sink::ApplicationDomain::BusyStatus;
204 emit notify(n);
205
206 SinkLog() << " Synchronizing";
207 return mSynchronizer->synchronize(query)
208 .then<void>([this](const KAsync::Error &error) {
209 if (!error) {
210 SinkLog() << "Done Synchronizing";
211 Sink::Notification n;
212 n.id = "sync";
213 n.type = Sink::Notification::Status;
214 n.message = "Synchronization has ended.";
215 n.code = Sink::ApplicationDomain::ConnectedStatus;
216 emit notify(n);
217 return KAsync::null();
218 }
219 return KAsync::error(error);
220 });
221 });
222} 106}
223 107
224static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue)
225{
226 if (queue.isEmpty()) {
227 f.setFinished();
228 } else {
229 QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); });
230 }
231};
232
233KAsync::Job<void> GenericResource::processAllMessages() 108KAsync::Job<void> GenericResource::processAllMessages()
234{ 109{
235 // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. 110 return mProcessor->processAllMessages();
236 // TODO: report errors while processing sync?
237 // TODO JOBAPI: A helper that waits for n events and then continues?
238 return KAsync::start<void>([this](KAsync::Future<void> &f) {
239 if (mCommitQueueTimer.isActive()) {
240 auto context = new QObject;
241 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() {
242 delete context;
243 f.setFinished();
244 });
245 } else {
246 f.setFinished();
247 }
248 })
249 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); })
250 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); })
251 .then<void>([this](KAsync::Future<void> &f) {
252 if (mSynchronizer->allChangesReplayed()) {
253 f.setFinished();
254 } else {
255 auto context = new QObject;
256 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() {
257 delete context;
258 f.setFinished();
259 });
260 }
261 });
262} 111}
263 112
264void GenericResource::updateLowerBoundRevision() 113void GenericResource::updateLowerBoundRevision()
diff --git a/common/genericresource.h b/common/genericresource.h
index 0bc47da..cc73f50 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -47,9 +47,6 @@ public:
47 virtual ~GenericResource(); 47 virtual ~GenericResource();
48 48
49 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; 49 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE;
50 virtual void processFlushCommand(const QByteArray &data);
51 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE;
52 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
53 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; 50 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE;
54 51
55 int error() const; 52 int error() const;
@@ -57,6 +54,9 @@ public:
57 static void removeFromDisk(const QByteArray &instanceIdentifier); 54 static void removeFromDisk(const QByteArray &instanceIdentifier);
58 static qint64 diskUsage(const QByteArray &instanceIdentifier); 55 static qint64 diskUsage(const QByteArray &instanceIdentifier);
59 56
57 KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query);
58 KAsync::Job<void> processAllMessages();
59
60private slots: 60private slots:
61 void updateLowerBoundRevision(); 61 void updateLowerBoundRevision();
62 62
@@ -69,15 +69,12 @@ protected:
69 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); 69 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
70 70
71 ResourceContext mResourceContext; 71 ResourceContext mResourceContext;
72 MessageQueue mUserQueue;
73 MessageQueue mSynchronizerQueue;
74 QSharedPointer<Pipeline> mPipeline;
75 72
76private: 73private:
77 std::unique_ptr<CommandProcessor> mProcessor; 74 QSharedPointer<Pipeline> mPipeline;
75 QSharedPointer<CommandProcessor> mProcessor;
78 QSharedPointer<Synchronizer> mSynchronizer; 76 QSharedPointer<Synchronizer> mSynchronizer;
79 int mError; 77 int mError;
80 QTimer mCommitQueueTimer;
81 qint64 mClientLowerBoundRevision; 78 qint64 mClientLowerBoundRevision;
82}; 79};
83 80
diff --git a/common/listener.cpp b/common/listener.cpp
index 9e80c45..d3ef0f1 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -30,15 +30,12 @@
30#include "common/commandcompletion_generated.h" 30#include "common/commandcompletion_generated.h"
31#include "common/handshake_generated.h" 31#include "common/handshake_generated.h"
32#include "common/revisionupdate_generated.h" 32#include "common/revisionupdate_generated.h"
33#include "common/synchronize_generated.h"
34#include "common/notification_generated.h" 33#include "common/notification_generated.h"
35#include "common/revisionreplayed_generated.h" 34#include "common/revisionreplayed_generated.h"
36 35
37#include <QLocalServer> 36#include <QLocalServer>
38#include <QLocalSocket> 37#include <QLocalSocket>
39#include <QTimer> 38#include <QTimer>
40#include <QTime>
41#include <QDataStream>
42 39
43Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) 40Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent)
44 : QObject(parent), 41 : QObject(parent),
@@ -235,39 +232,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
235 } 232 }
236 break; 233 break;
237 } 234 }
238 case Sink::Commands::SynchronizeCommand: { 235 case Sink::Commands::SynchronizeCommand:
239 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
240 if (Sink::Commands::VerifySynchronizeBuffer(verifier)) {
241 auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData());
242 SinkTrace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name);
243 auto timer = QSharedPointer<QTime>::create();
244 timer->start();
245 auto job = KAsync::null<void>();
246 Sink::QueryBase query;
247 if (buffer->query()) {
248 auto data = QByteArray::fromStdString(buffer->query()->str());
249 QDataStream stream(&data, QIODevice::ReadOnly);
250 stream >> query;
251 }
252 job = loadResource().synchronizeWithSource(query);
253 job.then<void>([callback, timer](const KAsync::Error &error) {
254 if (error) {
255 SinkWarning() << "Sync failed: " << error.errorMessage;
256 callback(false);
257 return KAsync::error(error);
258 } else {
259 SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
260 callback(true);
261 return KAsync::null();
262 }
263 })
264 .exec();
265 return;
266 } else {
267 SinkWarning() << "received invalid command";
268 }
269 break;
270 }
271 case Sink::Commands::InspectionCommand: 236 case Sink::Commands::InspectionCommand:
272 case Sink::Commands::DeleteEntityCommand: 237 case Sink::Commands::DeleteEntityCommand:
273 case Sink::Commands::ModifyEntityCommand: 238 case Sink::Commands::ModifyEntityCommand:
@@ -293,7 +258,8 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
293 SinkWarning() << "received invalid command"; 258 SinkWarning() << "received invalid command";
294 } 259 }
295 loadResource().setLowerBoundRevision(lowerBoundRevision()); 260 loadResource().setLowerBoundRevision(lowerBoundRevision());
296 } break; 261 }
262 break;
297 case Sink::Commands::RemoveFromDiskCommand: { 263 case Sink::Commands::RemoveFromDiskCommand: {
298 SinkLog() << QString("Received a remove from disk command from %1").arg(client.name); 264 SinkLog() << QString("Received a remove from disk command from %1").arg(client.name);
299 //Close the resource to ensure no transactions are open 265 //Close the resource to ensure no transactions are open
diff --git a/common/resource.cpp b/common/resource.cpp
index f81f094..533a132 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -46,16 +46,6 @@ void Resource::processCommand(int commandId, const QByteArray &data)
46 Q_UNUSED(data) 46 Q_UNUSED(data)
47} 47}
48 48
49KAsync::Job<void> Resource::synchronizeWithSource(const Sink::QueryBase &query)
50{
51 return KAsync::null<void>();
52}
53
54KAsync::Job<void> Resource::processAllMessages()
55{
56 return KAsync::null<void>();
57}
58
59void Resource::setLowerBoundRevision(qint64 revision) 49void Resource::setLowerBoundRevision(qint64 revision)
60{ 50{
61 Q_UNUSED(revision) 51 Q_UNUSED(revision)
diff --git a/common/resource.h b/common/resource.h
index 3cc326c..7789c53 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -43,16 +43,6 @@ public:
43 virtual void processCommand(int commandId, const QByteArray &data); 43 virtual void processCommand(int commandId, const QByteArray &data);
44 44
45 /** 45 /**
46 * Execute synchronization with the source.
47 */
48 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &);
49
50 /**
51 * Process all internal messages, thus ensuring the store is up to date and no pending modifications are left.
52 */
53 virtual KAsync::Job<void> processAllMessages();
54
55 /**
56 * Set the lowest revision that is still referenced by external clients. 46 * Set the lowest revision that is still referenced by external clients.
57 */ 47 */
58 virtual void setLowerBoundRevision(qint64 revision); 48 virtual void setLowerBoundRevision(qint64 revision);
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 99d4877..ae597bd 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -66,7 +66,7 @@ public slots:
66 66
67protected: 67protected:
68 ///Base implementation calls the replay$Type calls 68 ///Base implementation calls the replay$Type calls
69 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; 69 KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
70 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; 70 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
71 71
72protected: 72protected: