summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp378
1 files changed, 188 insertions, 190 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 74a8cfb..9c9a12f 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -20,7 +20,7 @@
20#include <QTime> 20#include <QTime>
21 21
22static int sBatchSize = 100; 22static int sBatchSize = 100;
23//This interval directly affects the roundtrip time of single commands 23// This interval directly affects the roundtrip time of single commands
24static int sCommitInterval = 10; 24static int sCommitInterval = 10;
25 25
26using namespace Sink; 26using namespace Sink;
@@ -39,26 +39,23 @@ class ChangeReplay : public QObject
39{ 39{
40 Q_OBJECT 40 Q_OBJECT
41public: 41public:
42
43 typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; 42 typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction;
44 43
45 ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction) 44 ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction)
46 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), 45 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayFunction(replayFunction)
47 mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite),
48 mReplayFunction(replayFunction)
49 { 46 {
50
51 } 47 }
52 48
53 qint64 getLastReplayedRevision() 49 qint64 getLastReplayedRevision()
54 { 50 {
55 qint64 lastReplayedRevision = 0; 51 qint64 lastReplayedRevision = 0;
56 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); 52 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly);
57 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 53 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
58 lastReplayedRevision = value.toLongLong(); 54 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
59 return false; 55 lastReplayedRevision = value.toLongLong();
60 }, [](const Storage::Error &) { 56 return false;
61 }); 57 },
58 [](const Storage::Error &) {});
62 return lastReplayedRevision; 59 return lastReplayedRevision;
63 } 60 }
64 61
@@ -79,28 +76,30 @@ public slots:
79 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); 76 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly);
80 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); 77 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite);
81 qint64 lastReplayedRevision = 1; 78 qint64 lastReplayedRevision = 1;
82 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 79 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
83 lastReplayedRevision = value.toLongLong(); 80 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
84 return false; 81 lastReplayedRevision = value.toLongLong();
85 }, [](const Storage::Error &) { 82 return false;
86 }); 83 },
84 [](const Storage::Error &) {});
87 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); 85 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
88 86
89 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; 87 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
90 if (lastReplayedRevision <= topRevision) { 88 if (lastReplayedRevision <= topRevision) {
91 qint64 revision = lastReplayedRevision; 89 qint64 revision = lastReplayedRevision;
92 for (;revision <= topRevision; revision++) { 90 for (; revision <= topRevision; revision++) {
93 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 91 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
94 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); 92 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
95 const auto key = Storage::assembleKey(uid, revision); 93 const auto key = Storage::assembleKey(uid, revision);
96 Storage::mainDatabase(mainStoreTransaction, type).scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { 94 Storage::mainDatabase(mainStoreTransaction, type)
97 mReplayFunction(type, key, value).exec(); 95 .scan(key,
98 //TODO make for loop async, and pass to async replay function together with type 96 [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool {
99 Trace() << "Replaying " << key; 97 mReplayFunction(type, key, value).exec();
100 return false; 98 // TODO make for loop async, and pass to async replay function together with type
101 }, [key](const Storage::Error &) { 99 Trace() << "Replaying " << key;
102 ErrorMsg() << "Failed to replay change " << key; 100 return false;
103 }); 101 },
102 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; });
104 } 103 }
105 revision--; 104 revision--;
106 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 105 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
@@ -126,15 +125,12 @@ class CommandProcessor : public QObject
126{ 125{
127 Q_OBJECT 126 Q_OBJECT
128 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; 127 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
128
129public: 129public:
130 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue*> commandQueues) 130 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false)
131 : QObject(),
132 mPipeline(pipeline),
133 mCommandQueues(commandQueues),
134 mProcessingLock(false)
135 { 131 {
136 mPipeline->startTransaction(); 132 mPipeline->startTransaction();
137 //FIXME Should be initialized to the current value of the change replay queue 133 // FIXME Should be initialized to the current value of the change replay queue
138 mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); 134 mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction());
139 mPipeline->commit(); 135 mPipeline->commit();
140 136
@@ -176,18 +172,20 @@ private slots:
176 return; 172 return;
177 } 173 }
178 mProcessingLock = true; 174 mProcessingLock = true;
179 auto job = processPipeline().then<void>([this]() { 175 auto job = processPipeline()
180 mProcessingLock = false; 176 .then<void>([this]() {
181 if (messagesToProcessAvailable()) { 177 mProcessingLock = false;
182 process(); 178 if (messagesToProcessAvailable()) {
183 } 179 process();
184 }).exec(); 180 }
181 })
182 .exec();
185 } 183 }
186 184
187 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand) 185 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand)
188 { 186 {
189 Log() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); 187 Log() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId());
190 //Throw command into appropriate pipeline 188 // Throw command into appropriate pipeline
191 switch (queuedCommand->commandId()) { 189 switch (queuedCommand->commandId()) {
192 case Sink::Commands::DeleteEntityCommand: 190 case Sink::Commands::DeleteEntityCommand:
193 return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 191 return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
@@ -197,9 +195,7 @@ private slots:
197 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 195 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
198 case Sink::Commands::InspectionCommand: 196 case Sink::Commands::InspectionCommand:
199 if (mInspect) { 197 if (mInspect) {
200 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { 198 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { return -1; });
201 return -1;
202 });
203 } else { 199 } else {
204 return KAsync::error<qint64>(-1, "Missing inspection command."); 200 return KAsync::error<qint64>(-1, "Missing inspection command.");
205 } 201 }
@@ -218,50 +214,47 @@ private slots:
218 auto queuedCommand = Sink::GetQueuedCommand(data.constData()); 214 auto queuedCommand = Sink::GetQueuedCommand(data.constData());
219 const auto commandId = queuedCommand->commandId(); 215 const auto commandId = queuedCommand->commandId();
220 Trace() << "Dequeued Command: " << Sink::Commands::name(commandId); 216 Trace() << "Dequeued Command: " << Sink::Commands::name(commandId);
221 return processQueuedCommand(queuedCommand).then<qint64, qint64>( 217 return processQueuedCommand(queuedCommand)
222 [commandId](qint64 createdRevision) -> qint64 { 218 .then<qint64, qint64>(
223 Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId); 219 [commandId](qint64 createdRevision) -> qint64 {
224 return createdRevision; 220 Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
225 } 221 return createdRevision;
226 , 222 },
227 [](int errorCode, QString errorMessage) { 223 [](int errorCode, QString errorMessage) {
228 //FIXME propagate error, we didn't handle it 224 // FIXME propagate error, we didn't handle it
229 Warning() << "Error while processing queue command: " << errorMessage; 225 Warning() << "Error while processing queue command: " << errorMessage;
230 } 226 });
231 );
232 } 227 }
233 228
234 //Process all messages of this queue 229 // Process all messages of this queue
235 KAsync::Job<void> processQueue(MessageQueue *queue) 230 KAsync::Job<void> processQueue(MessageQueue *queue)
236 { 231 {
237 auto time = QSharedPointer<QTime>::create(); 232 auto time = QSharedPointer<QTime>::create();
238 return KAsync::start<void>([this](){ 233 return KAsync::start<void>([this]() { mPipeline->startTransaction(); })
239 mPipeline->startTransaction(); 234 .then(KAsync::dowhile([queue]() { return !queue->isEmpty(); },
240 }).then(KAsync::dowhile( 235 [this, queue, time](KAsync::Future<void> &future) {
241 [queue]() { return !queue->isEmpty(); }, 236 queue->dequeueBatch(sBatchSize,
242 [this, queue, time](KAsync::Future<void> &future) { 237 [this, time](const QByteArray &data) {
243 queue->dequeueBatch(sBatchSize, [this, time](const QByteArray &data) { 238 time->start();
244 time->start(); 239 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) {
245 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { 240 processQueuedCommand(data)
246 processQueuedCommand(data).then<void, qint64>([&future, this, time](qint64 createdRevision) { 241 .then<void, qint64>([&future, this, time](qint64 createdRevision) {
247 Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); 242 Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
243 future.setFinished();
244 })
245 .exec();
246 });
247 })
248 .then<void>([&future, queue]() { future.setFinished(); },
249 [&future](int i, QString error) {
250 if (i != MessageQueue::ErrorCodes::NoMessageFound) {
251 Warning() << "Error while getting message from messagequeue: " << error;
252 }
248 future.setFinished(); 253 future.setFinished();
249 }).exec(); 254 })
250 }); 255 .exec();
251 } 256 }))
252 ).then<void>([&future, queue](){ 257 .then<void>([this]() { mPipeline->commit(); });
253 future.setFinished();
254 },
255 [&future](int i, QString error) {
256 if (i != MessageQueue::ErrorCodes::NoMessageFound) {
257 Warning() << "Error while getting message from messagequeue: " << error;
258 }
259 future.setFinished();
260 }).exec();
261 }
262 )).then<void>([this]() {
263 mPipeline->commit();
264 });
265 } 258 }
266 259
267 KAsync::Job<void> processPipeline() 260 KAsync::Job<void> processPipeline()
@@ -276,29 +269,29 @@ private slots:
276 mPipeline->commit(); 269 mPipeline->commit();
277 Trace() << "Cleanup done." << Log::TraceTime(time->elapsed()); 270 Trace() << "Cleanup done." << Log::TraceTime(time->elapsed());
278 271
279 //Go through all message queues 272 // Go through all message queues
280 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); 273 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues);
281 return KAsync::dowhile( 274 return KAsync::dowhile([it]() { return it->hasNext(); },
282 [it]() { return it->hasNext(); },
283 [it, this](KAsync::Future<void> &future) { 275 [it, this](KAsync::Future<void> &future) {
284 auto time = QSharedPointer<QTime>::create(); 276 auto time = QSharedPointer<QTime>::create();
285 time->start(); 277 time->start();
286 278
287 auto queue = it->next(); 279 auto queue = it->next();
288 processQueue(queue).then<void>([&future, time]() { 280 processQueue(queue)
289 Trace() << "Queue processed." << Log::TraceTime(time->elapsed()); 281 .then<void>([&future, time]() {
290 future.setFinished(); 282 Trace() << "Queue processed." << Log::TraceTime(time->elapsed());
291 }).exec(); 283 future.setFinished();
292 } 284 })
293 ); 285 .exec();
286 });
294 } 287 }
295 288
296private: 289private:
297 Sink::Pipeline *mPipeline; 290 Sink::Pipeline *mPipeline;
298 //Ordered by priority 291 // Ordered by priority
299 QList<MessageQueue*> mCommandQueues; 292 QList<MessageQueue *> mCommandQueues;
300 bool mProcessingLock; 293 bool mProcessingLock;
301 //The lowest revision we no longer need 294 // The lowest revision we no longer need
302 qint64 mLowerBoundRevision; 295 qint64 mLowerBoundRevision;
303 InspectionFunction mInspect; 296 InspectionFunction mInspect;
304}; 297};
@@ -308,14 +301,14 @@ private:
308 301
309GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) 302GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline)
310 : Sink::Resource(), 303 : Sink::Resource(),
311 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 304 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"),
312 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), 305 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"),
313 mResourceInstanceIdentifier(resourceInstanceIdentifier), 306 mResourceInstanceIdentifier(resourceInstanceIdentifier),
314 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), 307 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)),
315 mError(0), 308 mError(0),
316 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 309 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
317{ 310{
318 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); 311 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue);
319 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 312 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
320 flatbuffers::Verifier verifier((const uint8_t *)command, size); 313 flatbuffers::Verifier verifier((const uint8_t *)command, size);
321 if (Sink::Commands::VerifyInspectionBuffer(verifier)) { 314 if (Sink::Commands::VerifyInspectionBuffer(verifier)) {
@@ -330,22 +323,26 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
330 QDataStream s(expectedValueString); 323 QDataStream s(expectedValueString);
331 QVariant expectedValue; 324 QVariant expectedValue;
332 s >> expectedValue; 325 s >> expectedValue;
333 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() { 326 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue)
334 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; 327 .then<void>(
335 Sink::Notification n; 328 [=]() {
336 n.type = Sink::Commands::NotificationType_Inspection; 329 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
337 n.id = inspectionId; 330 Sink::Notification n;
338 n.code = Sink::Commands::NotificationCode_Success; 331 n.type = Sink::Commands::NotificationType_Inspection;
339 emit notify(n); 332 n.id = inspectionId;
340 }, [=](int code, const QString &message) { 333 n.code = Sink::Commands::NotificationCode_Success;
341 Log() << "Inspection failed: "<< inspectionType << inspectionId << entityId << message; 334 emit notify(n);
342 Sink::Notification n; 335 },
343 n.type = Sink::Commands::NotificationType_Inspection; 336 [=](int code, const QString &message) {
344 n.message = message; 337 Log() << "Inspection failed: " << inspectionType << inspectionId << entityId << message;
345 n.id = inspectionId; 338 Sink::Notification n;
346 n.code = Sink::Commands::NotificationCode_Failure; 339 n.type = Sink::Commands::NotificationType_Inspection;
347 emit notify(n); 340 n.message = message;
348 }).exec(); 341 n.id = inspectionId;
342 n.code = Sink::Commands::NotificationCode_Failure;
343 emit notify(n);
344 })
345 .exec();
349 return KAsync::null<void>(); 346 return KAsync::null<void>();
350 } 347 }
351 return KAsync::error<void>(-1, "Invalid inspection command."); 348 return KAsync::error<void>(-1, "Invalid inspection command.");
@@ -353,9 +350,9 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
353 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 350 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
354 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 351 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
355 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { 352 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) {
356 //This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) 353 // This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync)
357 auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); 354 auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite);
358 return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore](){}); 355 return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore]() {});
359 }); 356 });
360 enableChangeReplay(true); 357 enableChangeReplay(true);
361 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 358 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
@@ -372,7 +369,8 @@ GenericResource::~GenericResource()
372 delete mSourceChangeReplay; 369 delete mSourceChangeReplay;
373} 370}
374 371
375KAsync::Job<void> GenericResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) 372KAsync::Job<void> GenericResource::inspect(
373 int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
376{ 374{
377 Warning() << "Inspection not implemented"; 375 Warning() << "Inspection not implemented";
378 return KAsync::null<void>(); 376 return KAsync::null<void>();
@@ -390,7 +388,7 @@ void GenericResource::enableChangeReplay(bool enable)
390 } 388 }
391} 389}
392 390
393void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor*> &preprocessors) 391void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors)
394{ 392{
395 mPipeline->setPreprocessors(type, preprocessors); 393 mPipeline->setPreprocessors(type, preprocessors);
396 mPipeline->setAdaptorFactory(type, factory); 394 mPipeline->setAdaptorFactory(type, factory);
@@ -463,14 +461,16 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
463{ 461{
464 return KAsync::start<void>([this]() { 462 return KAsync::start<void>([this]() {
465 Log() << " Synchronizing"; 463 Log() << " Synchronizing";
466 //Changereplay would deadlock otherwise when trying to open the synchronization store 464 // Changereplay would deadlock otherwise when trying to open the synchronization store
467 enableChangeReplay(false); 465 enableChangeReplay(false);
468 auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); 466 auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly);
469 auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); 467 auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite);
470 synchronizeWithSource(*mainStore, *syncStore).then<void>([this, mainStore, syncStore]() { 468 synchronizeWithSource(*mainStore, *syncStore)
471 Log() << "Done Synchronizing"; 469 .then<void>([this, mainStore, syncStore]() {
472 enableChangeReplay(true); 470 Log() << "Done Synchronizing";
473 }).exec(); 471 enableChangeReplay(true);
472 })
473 .exec();
474 }); 474 });
475} 475}
476 476
@@ -484,42 +484,39 @@ static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue)
484 if (queue.isEmpty()) { 484 if (queue.isEmpty()) {
485 f.setFinished(); 485 f.setFinished();
486 } else { 486 } else {
487 QObject::connect(&queue, &MessageQueue::drained, [&f]() { 487 QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); });
488 f.setFinished();
489 });
490 } 488 }
491}; 489};
492 490
493KAsync::Job<void> GenericResource::processAllMessages() 491KAsync::Job<void> GenericResource::processAllMessages()
494{ 492{
495 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. 493 // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
496 //TODO: report errors while processing sync? 494 // TODO: report errors while processing sync?
497 //TODO JOBAPI: A helper that waits for n events and then continues? 495 // TODO JOBAPI: A helper that waits for n events and then continues?
498 return KAsync::start<void>([this](KAsync::Future<void> &f) { 496 return KAsync::start<void>([this](KAsync::Future<void> &f) {
499 if (mCommitQueueTimer.isActive()) { 497 if (mCommitQueueTimer.isActive()) {
500 auto context = new QObject; 498 auto context = new QObject;
501 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { 499 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() {
502 delete context; 500 delete context;
501 f.setFinished();
502 });
503 } else {
504 f.setFinished();
505 }
506 })
507 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); })
508 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); })
509 .then<void>([this](KAsync::Future<void> &f) {
510 if (mSourceChangeReplay->allChangesReplayed()) {
503 f.setFinished(); 511 f.setFinished();
504 }); 512 } else {
505 } else { 513 auto context = new QObject;
506 f.setFinished(); 514 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() {
507 } 515 delete context;
508 }).then<void>([this](KAsync::Future<void> &f) { 516 f.setFinished();
509 waitForDrained(f, mSynchronizerQueue); 517 });
510 }).then<void>([this](KAsync::Future<void> &f) { 518 }
511 waitForDrained(f, mUserQueue); 519 });
512 }).then<void>([this](KAsync::Future<void> &f) {
513 if (mSourceChangeReplay->allChangesReplayed()) {
514 f.setFinished();
515 } else {
516 auto context = new QObject;
517 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() {
518 delete context;
519 f.setFinished();
520 });
521 }
522 });
523} 520}
524 521
525void GenericResource::updateLowerBoundRevision() 522void GenericResource::updateLowerBoundRevision()
@@ -533,14 +530,15 @@ void GenericResource::setLowerBoundRevision(qint64 revision)
533 updateLowerBoundRevision(); 530 updateLowerBoundRevision();
534} 531}
535 532
536void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 533void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
534 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
537{ 535{
538 //These changes are coming from the source 536 // These changes are coming from the source
539 const auto replayToSource = false; 537 const auto replayToSource = false;
540 flatbuffers::FlatBufferBuilder entityFbb; 538 flatbuffers::FlatBufferBuilder entityFbb;
541 adaptorFactory.createBuffer(domainObject, entityFbb); 539 adaptorFactory.createBuffer(domainObject, entityFbb);
542 flatbuffers::FlatBufferBuilder fbb; 540 flatbuffers::FlatBufferBuilder fbb;
543 //This is the resource type and not the domain type 541 // This is the resource type and not the domain type
544 auto entityId = fbb.CreateString(sinkId.toStdString()); 542 auto entityId = fbb.CreateString(sinkId.toStdString());
545 auto type = fbb.CreateString(bufferType.toStdString()); 543 auto type = fbb.CreateString(bufferType.toStdString());
546 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 544 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
@@ -549,18 +547,19 @@ void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &b
549 callback(BufferUtils::extractBuffer(fbb)); 547 callback(BufferUtils::extractBuffer(fbb));
550} 548}
551 549
552void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 550void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
551 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
553{ 552{
554 //These changes are coming from the source 553 // These changes are coming from the source
555 const auto replayToSource = false; 554 const auto replayToSource = false;
556 flatbuffers::FlatBufferBuilder entityFbb; 555 flatbuffers::FlatBufferBuilder entityFbb;
557 adaptorFactory.createBuffer(domainObject, entityFbb); 556 adaptorFactory.createBuffer(domainObject, entityFbb);
558 flatbuffers::FlatBufferBuilder fbb; 557 flatbuffers::FlatBufferBuilder fbb;
559 auto entityId = fbb.CreateString(sinkId.toStdString()); 558 auto entityId = fbb.CreateString(sinkId.toStdString());
560 //This is the resource type and not the domain type 559 // This is the resource type and not the domain type
561 auto type = fbb.CreateString(bufferType.toStdString()); 560 auto type = fbb.CreateString(bufferType.toStdString());
562 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 561 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
563 //TODO removals 562 // TODO removals
564 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); 563 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource);
565 Sink::Commands::FinishModifyEntityBuffer(fbb, location); 564 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
566 callback(BufferUtils::extractBuffer(fbb)); 565 callback(BufferUtils::extractBuffer(fbb));
@@ -568,11 +567,11 @@ void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, co
568 567
569void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) 568void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
570{ 569{
571 //These changes are coming from the source 570 // These changes are coming from the source
572 const auto replayToSource = false; 571 const auto replayToSource = false;
573 flatbuffers::FlatBufferBuilder fbb; 572 flatbuffers::FlatBufferBuilder fbb;
574 auto entityId = fbb.CreateString(sinkId.toStdString()); 573 auto entityId = fbb.CreateString(sinkId.toStdString());
575 //This is the resource type and not the domain type 574 // This is the resource type and not the domain type
576 auto type = fbb.CreateString(bufferType.toStdString()); 575 auto type = fbb.CreateString(bufferType.toStdString());
577 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); 576 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
578 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 577 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
@@ -581,7 +580,8 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co
581 580
582void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) 581void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
583{ 582{
584 Index("rid.mapping." + bufferType, transaction).add(remoteId, localId);; 583 Index("rid.mapping." + bufferType, transaction).add(remoteId, localId);
584 ;
585 Index("localid.mapping." + bufferType, transaction).add(localId, remoteId); 585 Index("localid.mapping." + bufferType, transaction).add(localId, remoteId);
586} 586}
587 587
@@ -600,7 +600,7 @@ void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteAr
600 600
601QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) 601QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
602{ 602{
603 //Lookup local id for remote id, or insert a new pair otherwise 603 // Lookup local id for remote id, or insert a new pair otherwise
604 Index index("rid.mapping." + bufferType, transaction); 604 Index index("rid.mapping." + bufferType, transaction);
605 QByteArray sinkId = index.lookup(remoteId); 605 QByteArray sinkId = index.lookup(remoteId);
606 if (sinkId.isEmpty()) { 606 if (sinkId.isEmpty()) {
@@ -621,19 +621,19 @@ QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const Q
621 return remoteId; 621 return remoteId;
622} 622}
623 623
624void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) 624void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType,
625 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
625{ 626{
626 entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { 627 entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) {
627 auto sinkId = Sink::Storage::uidFromKey(key); 628 auto sinkId = Sink::Storage::uidFromKey(key);
628 Trace() << "Checking for removal " << key; 629 Trace() << "Checking for removal " << key;
629 const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); 630 const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction);
630 //If we have no remoteId, the entity hasn't been replayed to the source yet 631 // If we have no remoteId, the entity hasn't been replayed to the source yet
631 if (!remoteId.isEmpty()) { 632 if (!remoteId.isEmpty()) {
632 if (!exists(remoteId)) { 633 if (!exists(remoteId)) {
633 Trace() << "Found a removed entity: " << sinkId; 634 Trace() << "Found a removed entity: " << sinkId;
634 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) { 635 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType,
635 enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); 636 [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); });
636 });
637 } 637 }
638 } 638 }
639 }); 639 });
@@ -642,32 +642,31 @@ void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, S
642static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) 642static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory)
643{ 643{
644 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 644 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
645 db.findLatest(uid, [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 645 db.findLatest(uid,
646 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 646 [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
647 if (!buffer.isValid()) { 647 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
648 Warning() << "Read invalid buffer from disk"; 648 if (!buffer.isValid()) {
649 } else { 649 Warning() << "Read invalid buffer from disk";
650 current = adaptorFactory.createAdaptor(buffer.entity()); 650 } else {
651 } 651 current = adaptorFactory.createAdaptor(buffer.entity());
652 return false; 652 }
653 }, 653 return false;
654 [](const Sink::Storage::Error &error) { 654 },
655 Warning() << "Failed to read current value from storage: " << error.message; 655 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
656 });
657 return current; 656 return current;
658} 657}
659 658
660void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 659void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction,
660 DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
661{ 661{
662 auto mainDatabase = Storage::mainDatabase(transaction, bufferType); 662 auto mainDatabase = Storage::mainDatabase(transaction, bufferType);
663 const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); 663 const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction);
664 const auto found = mainDatabase.contains(sinkId); 664 const auto found = mainDatabase.contains(sinkId);
665 if (!found) { 665 if (!found) {
666 Trace() << "Found a new entity: " << remoteId; 666 Trace() << "Found a new entity: " << remoteId;
667 createEntity(sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { 667 createEntity(
668 enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); 668 sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); });
669 }); 669 } else { // modification
670 } else { //modification
671 if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { 670 if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) {
672 bool changed = false; 671 bool changed = false;
673 for (const auto &property : entity.changedProperties()) { 672 for (const auto &property : entity.changedProperties()) {
@@ -678,9 +677,8 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si
678 } 677 }
679 if (changed) { 678 if (changed) {
680 Trace() << "Found a modified entity: " << remoteId; 679 Trace() << "Found a modified entity: " << remoteId;
681 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { 680 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory,
682 enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); 681 [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); });
683 });
684 } 682 }
685 } else { 683 } else {
686 Warning() << "Failed to get current entity"; 684 Warning() << "Failed to get current entity";