diff options
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 77 |
1 files changed, 53 insertions, 24 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index c7326d3..2688df0 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -17,11 +17,17 @@ | |||
17 | 17 | ||
18 | #include <QUuid> | 18 | #include <QUuid> |
19 | #include <QDataStream> | 19 | #include <QDataStream> |
20 | #include <QTime> | ||
20 | 21 | ||
21 | static int sBatchSize = 100; | 22 | static int sBatchSize = 100; |
23 | //This interval directly affects the roundtrip time of single commands | ||
24 | static int sCommitInterval = 10; | ||
22 | 25 | ||
23 | using namespace Sink; | 26 | using namespace Sink; |
24 | 27 | ||
28 | #undef DEBUG_AREA | ||
29 | #define DEBUG_AREA "resource.changereplay" | ||
30 | |||
25 | /** | 31 | /** |
26 | * Replays changes from the storage one by one. | 32 | * Replays changes from the storage one by one. |
27 | * | 33 | * |
@@ -87,7 +93,7 @@ public Q_SLOTS: | |||
87 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | 93 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); |
88 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | 94 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); |
89 | const auto key = Storage::assembleKey(uid, revision); | 95 | const auto key = Storage::assembleKey(uid, revision); |
90 | mainStoreTransaction.openDatabase(type + ".main").scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | 96 | Storage::mainDatabase(mainStoreTransaction, type).scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { |
91 | mReplayFunction(type, key, value).exec(); | 97 | mReplayFunction(type, key, value).exec(); |
92 | //TODO make for loop async, and pass to async replay function together with type | 98 | //TODO make for loop async, and pass to async replay function together with type |
93 | Trace() << "Replaying " << key; | 99 | Trace() << "Replaying " << key; |
@@ -110,6 +116,9 @@ private: | |||
110 | ReplayFunction mReplayFunction; | 116 | ReplayFunction mReplayFunction; |
111 | }; | 117 | }; |
112 | 118 | ||
119 | #undef DEBUG_AREA | ||
120 | #define DEBUG_AREA "resource.commandprocessor" | ||
121 | |||
113 | /** | 122 | /** |
114 | * Drives the pipeline using the output from all command queues | 123 | * Drives the pipeline using the output from all command queues |
115 | */ | 124 | */ |
@@ -197,7 +206,6 @@ private slots: | |||
197 | default: | 206 | default: |
198 | return KAsync::error<qint64>(-1, "Unhandled command"); | 207 | return KAsync::error<qint64>(-1, "Unhandled command"); |
199 | } | 208 | } |
200 | return KAsync::null<qint64>(); | ||
201 | } | 209 | } |
202 | 210 | ||
203 | KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) | 211 | KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) |
@@ -226,15 +234,17 @@ private slots: | |||
226 | //Process all messages of this queue | 234 | //Process all messages of this queue |
227 | KAsync::Job<void> processQueue(MessageQueue *queue) | 235 | KAsync::Job<void> processQueue(MessageQueue *queue) |
228 | { | 236 | { |
237 | auto time = QSharedPointer<QTime>::create(); | ||
229 | return KAsync::start<void>([this](){ | 238 | return KAsync::start<void>([this](){ |
230 | mPipeline->startTransaction(); | 239 | mPipeline->startTransaction(); |
231 | }).then(KAsync::dowhile( | 240 | }).then(KAsync::dowhile( |
232 | [queue]() { return !queue->isEmpty(); }, | 241 | [queue]() { return !queue->isEmpty(); }, |
233 | [this, queue](KAsync::Future<void> &future) { | 242 | [this, queue, time](KAsync::Future<void> &future) { |
234 | queue->dequeueBatch(sBatchSize, [this](const QByteArray &data) { | 243 | queue->dequeueBatch(sBatchSize, [this, time](const QByteArray &data) { |
235 | return KAsync::start<void>([this, data](KAsync::Future<void> &future) { | 244 | time->start(); |
236 | processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { | 245 | return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { |
237 | Trace() << "Created revision " << createdRevision; | 246 | processQueuedCommand(data).then<void, qint64>([&future, this, time](qint64 createdRevision) { |
247 | Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); | ||
238 | future.setFinished(); | 248 | future.setFinished(); |
239 | }).exec(); | 249 | }).exec(); |
240 | }); | 250 | }); |
@@ -256,21 +266,27 @@ private slots: | |||
256 | 266 | ||
257 | KAsync::Job<void> processPipeline() | 267 | KAsync::Job<void> processPipeline() |
258 | { | 268 | { |
269 | auto time = QSharedPointer<QTime>::create(); | ||
270 | time->start(); | ||
259 | mPipeline->startTransaction(); | 271 | mPipeline->startTransaction(); |
260 | Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; | 272 | Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; |
261 | for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { | 273 | for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { |
262 | mPipeline->cleanupRevision(revision); | 274 | mPipeline->cleanupRevision(revision); |
263 | } | 275 | } |
264 | mPipeline->commit(); | 276 | mPipeline->commit(); |
277 | Trace() << "Cleanup done." << Log::TraceTime(time->elapsed()); | ||
265 | 278 | ||
266 | //Go through all message queues | 279 | //Go through all message queues |
267 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | 280 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); |
268 | return KAsync::dowhile( | 281 | return KAsync::dowhile( |
269 | [it]() { return it->hasNext(); }, | 282 | [it]() { return it->hasNext(); }, |
270 | [it, this](KAsync::Future<void> &future) { | 283 | [it, this](KAsync::Future<void> &future) { |
284 | auto time = QSharedPointer<QTime>::create(); | ||
285 | time->start(); | ||
286 | |||
271 | auto queue = it->next(); | 287 | auto queue = it->next(); |
272 | processQueue(queue).then<void>([&future]() { | 288 | processQueue(queue).then<void>([&future, time]() { |
273 | Trace() << "Queue processed"; | 289 | Trace() << "Queue processed." << Log::TraceTime(time->elapsed()); |
274 | future.setFinished(); | 290 | future.setFinished(); |
275 | }).exec(); | 291 | }).exec(); |
276 | } | 292 | } |
@@ -287,6 +303,8 @@ private: | |||
287 | InspectionFunction mInspect; | 303 | InspectionFunction mInspect; |
288 | }; | 304 | }; |
289 | 305 | ||
306 | #undef DEBUG_AREA | ||
307 | #define DEBUG_AREA "resource" | ||
290 | 308 | ||
291 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) | 309 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) |
292 | : Sink::Resource(), | 310 | : Sink::Resource(), |
@@ -313,12 +331,14 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
313 | QVariant expectedValue; | 331 | QVariant expectedValue; |
314 | s >> expectedValue; | 332 | s >> expectedValue; |
315 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() { | 333 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() { |
334 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; | ||
316 | Sink::Notification n; | 335 | Sink::Notification n; |
317 | n.type = Sink::Commands::NotificationType_Inspection; | 336 | n.type = Sink::Commands::NotificationType_Inspection; |
318 | n.id = inspectionId; | 337 | n.id = inspectionId; |
319 | n.code = Sink::Commands::NotificationCode_Success; | 338 | n.code = Sink::Commands::NotificationCode_Success; |
320 | emit notify(n); | 339 | emit notify(n); |
321 | }, [=](int code, const QString &message) { | 340 | }, [=](int code, const QString &message) { |
341 | Log() << "Inspection failed: "<< inspectionType << inspectionId << entityId << message; | ||
322 | Sink::Notification n; | 342 | Sink::Notification n; |
323 | n.type = Sink::Commands::NotificationType_Inspection; | 343 | n.type = Sink::Commands::NotificationType_Inspection; |
324 | n.message = message; | 344 | n.message = message; |
@@ -341,7 +361,7 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
341 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | 361 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); |
342 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); | 362 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); |
343 | 363 | ||
344 | mCommitQueueTimer.setInterval(100); | 364 | mCommitQueueTimer.setInterval(sCommitInterval); |
345 | mCommitQueueTimer.setSingleShot(true); | 365 | mCommitQueueTimer.setSingleShot(true); |
346 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); | 366 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); |
347 | } | 367 | } |
@@ -381,13 +401,18 @@ KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, c | |||
381 | return KAsync::null<void>(); | 401 | return KAsync::null<void>(); |
382 | } | 402 | } |
383 | 403 | ||
404 | void GenericResource::removeDataFromDisk() | ||
405 | { | ||
406 | removeFromDisk(mResourceInstanceIdentifier); | ||
407 | } | ||
408 | |||
384 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) | 409 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) |
385 | { | 410 | { |
386 | Warning() << "Removing from generic resource"; | ||
387 | Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk(); | 411 | Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk(); |
388 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk(); | 412 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk(); |
389 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk(); | 413 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk(); |
390 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk(); | 414 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk(); |
415 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::ReadWrite).removeFromDisk(); | ||
391 | } | 416 | } |
392 | 417 | ||
393 | qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) | 418 | qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) |
@@ -556,38 +581,39 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co | |||
556 | 581 | ||
557 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | 582 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) |
558 | { | 583 | { |
559 | Index index("rid.mapping." + bufferType, transaction); | 584 | Index("rid.mapping." + bufferType, transaction).add(remoteId, localId);; |
560 | Index localIndex("localid.mapping." + bufferType, transaction); | 585 | Index("localid.mapping." + bufferType, transaction).add(localId, remoteId); |
561 | index.add(remoteId, localId); | ||
562 | localIndex.add(localId, remoteId); | ||
563 | } | 586 | } |
564 | 587 | ||
565 | void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | 588 | void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) |
566 | { | 589 | { |
567 | Index index("rid.mapping." + bufferType, transaction); | 590 | Index("rid.mapping." + bufferType, transaction).remove(remoteId, localId); |
568 | Index localIndex("localid.mapping." + bufferType, transaction); | 591 | Index("localid.mapping." + bufferType, transaction).remove(localId, remoteId); |
569 | index.remove(remoteId, localId); | 592 | } |
570 | localIndex.remove(localId, remoteId); | 593 | |
594 | void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
595 | { | ||
596 | const auto oldRemoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); | ||
597 | removeRemoteId(bufferType, localId, oldRemoteId, transaction); | ||
598 | recordRemoteId(bufferType, localId, remoteId, transaction); | ||
571 | } | 599 | } |
572 | 600 | ||
573 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | 601 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) |
574 | { | 602 | { |
575 | //Lookup local id for remote id, or insert a new pair otherwise | 603 | //Lookup local id for remote id, or insert a new pair otherwise |
576 | Index index("rid.mapping." + bufferType, transaction); | 604 | Index index("rid.mapping." + bufferType, transaction); |
577 | Index localIndex("localid.mapping." + bufferType, transaction); | ||
578 | QByteArray sinkId = index.lookup(remoteId); | 605 | QByteArray sinkId = index.lookup(remoteId); |
579 | if (sinkId.isEmpty()) { | 606 | if (sinkId.isEmpty()) { |
580 | sinkId = QUuid::createUuid().toString().toUtf8(); | 607 | sinkId = QUuid::createUuid().toString().toUtf8(); |
581 | index.add(remoteId, sinkId); | 608 | index.add(remoteId, sinkId); |
582 | localIndex.add(sinkId, remoteId); | 609 | Index("localid.mapping." + bufferType, transaction).add(sinkId, remoteId); |
583 | } | 610 | } |
584 | return sinkId; | 611 | return sinkId; |
585 | } | 612 | } |
586 | 613 | ||
587 | QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction) | 614 | QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction) |
588 | { | 615 | { |
589 | Index index("localid.mapping." + bufferType, transaction); | 616 | QByteArray remoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); |
590 | QByteArray remoteId = index.lookup(localId); | ||
591 | if (remoteId.isEmpty()) { | 617 | if (remoteId.isEmpty()) { |
592 | Warning() << "Couldn't find the remote id for " << localId; | 618 | Warning() << "Couldn't find the remote id for " << localId; |
593 | return QByteArray(); | 619 | return QByteArray(); |
@@ -633,7 +659,7 @@ static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Si | |||
633 | 659 | ||
634 | void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | 660 | void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) |
635 | { | 661 | { |
636 | auto mainDatabase = transaction.openDatabase(bufferType + ".main"); | 662 | auto mainDatabase = Storage::mainDatabase(transaction, bufferType); |
637 | const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); | 663 | const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); |
638 | const auto found = mainDatabase.contains(sinkId); | 664 | const auto found = mainDatabase.contains(sinkId); |
639 | if (!found) { | 665 | if (!found) { |
@@ -663,4 +689,7 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si | |||
663 | } | 689 | } |
664 | 690 | ||
665 | 691 | ||
692 | #pragma clang diagnostic push | ||
693 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" | ||
666 | #include "genericresource.moc" | 694 | #include "genericresource.moc" |
695 | #pragma clang diagnostic pop | ||