summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp77
1 files changed, 53 insertions, 24 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index c7326d3..2688df0 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -17,11 +17,17 @@
17 17
18#include <QUuid> 18#include <QUuid>
19#include <QDataStream> 19#include <QDataStream>
20#include <QTime>
20 21
21static int sBatchSize = 100; 22static int sBatchSize = 100;
23//This interval directly affects the roundtrip time of single commands
24static int sCommitInterval = 10;
22 25
23using namespace Sink; 26using namespace Sink;
24 27
28#undef DEBUG_AREA
29#define DEBUG_AREA "resource.changereplay"
30
25/** 31/**
26 * Replays changes from the storage one by one. 32 * Replays changes from the storage one by one.
27 * 33 *
@@ -87,7 +93,7 @@ public Q_SLOTS:
87 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 93 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
88 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); 94 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
89 const auto key = Storage::assembleKey(uid, revision); 95 const auto key = Storage::assembleKey(uid, revision);
90 mainStoreTransaction.openDatabase(type + ".main").scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { 96 Storage::mainDatabase(mainStoreTransaction, type).scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool {
91 mReplayFunction(type, key, value).exec(); 97 mReplayFunction(type, key, value).exec();
92 //TODO make for loop async, and pass to async replay function together with type 98 //TODO make for loop async, and pass to async replay function together with type
93 Trace() << "Replaying " << key; 99 Trace() << "Replaying " << key;
@@ -110,6 +116,9 @@ private:
110 ReplayFunction mReplayFunction; 116 ReplayFunction mReplayFunction;
111}; 117};
112 118
119#undef DEBUG_AREA
120#define DEBUG_AREA "resource.commandprocessor"
121
113/** 122/**
114 * Drives the pipeline using the output from all command queues 123 * Drives the pipeline using the output from all command queues
115 */ 124 */
@@ -197,7 +206,6 @@ private slots:
197 default: 206 default:
198 return KAsync::error<qint64>(-1, "Unhandled command"); 207 return KAsync::error<qint64>(-1, "Unhandled command");
199 } 208 }
200 return KAsync::null<qint64>();
201 } 209 }
202 210
203 KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) 211 KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data)
@@ -226,15 +234,17 @@ private slots:
226 //Process all messages of this queue 234 //Process all messages of this queue
227 KAsync::Job<void> processQueue(MessageQueue *queue) 235 KAsync::Job<void> processQueue(MessageQueue *queue)
228 { 236 {
237 auto time = QSharedPointer<QTime>::create();
229 return KAsync::start<void>([this](){ 238 return KAsync::start<void>([this](){
230 mPipeline->startTransaction(); 239 mPipeline->startTransaction();
231 }).then(KAsync::dowhile( 240 }).then(KAsync::dowhile(
232 [queue]() { return !queue->isEmpty(); }, 241 [queue]() { return !queue->isEmpty(); },
233 [this, queue](KAsync::Future<void> &future) { 242 [this, queue, time](KAsync::Future<void> &future) {
234 queue->dequeueBatch(sBatchSize, [this](const QByteArray &data) { 243 queue->dequeueBatch(sBatchSize, [this, time](const QByteArray &data) {
235 return KAsync::start<void>([this, data](KAsync::Future<void> &future) { 244 time->start();
236 processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { 245 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) {
237 Trace() << "Created revision " << createdRevision; 246 processQueuedCommand(data).then<void, qint64>([&future, this, time](qint64 createdRevision) {
247 Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
238 future.setFinished(); 248 future.setFinished();
239 }).exec(); 249 }).exec();
240 }); 250 });
@@ -256,21 +266,27 @@ private slots:
256 266
257 KAsync::Job<void> processPipeline() 267 KAsync::Job<void> processPipeline()
258 { 268 {
269 auto time = QSharedPointer<QTime>::create();
270 time->start();
259 mPipeline->startTransaction(); 271 mPipeline->startTransaction();
260 Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; 272 Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision;
261 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { 273 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) {
262 mPipeline->cleanupRevision(revision); 274 mPipeline->cleanupRevision(revision);
263 } 275 }
264 mPipeline->commit(); 276 mPipeline->commit();
277 Trace() << "Cleanup done." << Log::TraceTime(time->elapsed());
265 278
266 //Go through all message queues 279 //Go through all message queues
267 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); 280 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
268 return KAsync::dowhile( 281 return KAsync::dowhile(
269 [it]() { return it->hasNext(); }, 282 [it]() { return it->hasNext(); },
270 [it, this](KAsync::Future<void> &future) { 283 [it, this](KAsync::Future<void> &future) {
284 auto time = QSharedPointer<QTime>::create();
285 time->start();
286
271 auto queue = it->next(); 287 auto queue = it->next();
272 processQueue(queue).then<void>([&future]() { 288 processQueue(queue).then<void>([&future, time]() {
273 Trace() << "Queue processed"; 289 Trace() << "Queue processed." << Log::TraceTime(time->elapsed());
274 future.setFinished(); 290 future.setFinished();
275 }).exec(); 291 }).exec();
276 } 292 }
@@ -287,6 +303,8 @@ private:
287 InspectionFunction mInspect; 303 InspectionFunction mInspect;
288}; 304};
289 305
306#undef DEBUG_AREA
307#define DEBUG_AREA "resource"
290 308
291GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) 309GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline)
292 : Sink::Resource(), 310 : Sink::Resource(),
@@ -313,12 +331,14 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
313 QVariant expectedValue; 331 QVariant expectedValue;
314 s >> expectedValue; 332 s >> expectedValue;
315 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() { 333 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() {
334 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
316 Sink::Notification n; 335 Sink::Notification n;
317 n.type = Sink::Commands::NotificationType_Inspection; 336 n.type = Sink::Commands::NotificationType_Inspection;
318 n.id = inspectionId; 337 n.id = inspectionId;
319 n.code = Sink::Commands::NotificationCode_Success; 338 n.code = Sink::Commands::NotificationCode_Success;
320 emit notify(n); 339 emit notify(n);
321 }, [=](int code, const QString &message) { 340 }, [=](int code, const QString &message) {
341 Log() << "Inspection failed: "<< inspectionType << inspectionId << entityId << message;
322 Sink::Notification n; 342 Sink::Notification n;
323 n.type = Sink::Commands::NotificationType_Inspection; 343 n.type = Sink::Commands::NotificationType_Inspection;
324 n.message = message; 344 n.message = message;
@@ -341,7 +361,7 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
341 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 361 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
342 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); 362 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision());
343 363
344 mCommitQueueTimer.setInterval(100); 364 mCommitQueueTimer.setInterval(sCommitInterval);
345 mCommitQueueTimer.setSingleShot(true); 365 mCommitQueueTimer.setSingleShot(true);
346 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); 366 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);
347} 367}
@@ -381,13 +401,18 @@ KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, c
381 return KAsync::null<void>(); 401 return KAsync::null<void>();
382} 402}
383 403
404void GenericResource::removeDataFromDisk()
405{
406 removeFromDisk(mResourceInstanceIdentifier);
407}
408
384void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) 409void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
385{ 410{
386 Warning() << "Removing from generic resource";
387 Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk(); 411 Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk();
388 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk(); 412 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk();
389 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk(); 413 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk();
390 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk(); 414 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk();
415 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::ReadWrite).removeFromDisk();
391} 416}
392 417
393qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) 418qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier)
@@ -556,38 +581,39 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co
556 581
557void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) 582void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
558{ 583{
559 Index index("rid.mapping." + bufferType, transaction); 584 Index("rid.mapping." + bufferType, transaction).add(remoteId, localId);;
560 Index localIndex("localid.mapping." + bufferType, transaction); 585 Index("localid.mapping." + bufferType, transaction).add(localId, remoteId);
561 index.add(remoteId, localId);
562 localIndex.add(localId, remoteId);
563} 586}
564 587
565void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) 588void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
566{ 589{
567 Index index("rid.mapping." + bufferType, transaction); 590 Index("rid.mapping." + bufferType, transaction).remove(remoteId, localId);
568 Index localIndex("localid.mapping." + bufferType, transaction); 591 Index("localid.mapping." + bufferType, transaction).remove(localId, remoteId);
569 index.remove(remoteId, localId); 592}
570 localIndex.remove(localId, remoteId); 593
594void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
595{
596 const auto oldRemoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId);
597 removeRemoteId(bufferType, localId, oldRemoteId, transaction);
598 recordRemoteId(bufferType, localId, remoteId, transaction);
571} 599}
572 600
573QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) 601QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
574{ 602{
575 //Lookup local id for remote id, or insert a new pair otherwise 603 //Lookup local id for remote id, or insert a new pair otherwise
576 Index index("rid.mapping." + bufferType, transaction); 604 Index index("rid.mapping." + bufferType, transaction);
577 Index localIndex("localid.mapping." + bufferType, transaction);
578 QByteArray sinkId = index.lookup(remoteId); 605 QByteArray sinkId = index.lookup(remoteId);
579 if (sinkId.isEmpty()) { 606 if (sinkId.isEmpty()) {
580 sinkId = QUuid::createUuid().toString().toUtf8(); 607 sinkId = QUuid::createUuid().toString().toUtf8();
581 index.add(remoteId, sinkId); 608 index.add(remoteId, sinkId);
582 localIndex.add(sinkId, remoteId); 609 Index("localid.mapping." + bufferType, transaction).add(sinkId, remoteId);
583 } 610 }
584 return sinkId; 611 return sinkId;
585} 612}
586 613
587QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction) 614QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction)
588{ 615{
589 Index index("localid.mapping." + bufferType, transaction); 616 QByteArray remoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId);
590 QByteArray remoteId = index.lookup(localId);
591 if (remoteId.isEmpty()) { 617 if (remoteId.isEmpty()) {
592 Warning() << "Couldn't find the remote id for " << localId; 618 Warning() << "Couldn't find the remote id for " << localId;
593 return QByteArray(); 619 return QByteArray();
@@ -633,7 +659,7 @@ static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Si
633 659
634void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 660void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
635{ 661{
636 auto mainDatabase = transaction.openDatabase(bufferType + ".main"); 662 auto mainDatabase = Storage::mainDatabase(transaction, bufferType);
637 const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); 663 const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction);
638 const auto found = mainDatabase.contains(sinkId); 664 const auto found = mainDatabase.contains(sinkId);
639 if (!found) { 665 if (!found) {
@@ -663,4 +689,7 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si
663} 689}
664 690
665 691
692#pragma clang diagnostic push
693#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
666#include "genericresource.moc" 694#include "genericresource.moc"
695#pragma clang diagnostic pop