diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-09-29 00:52:07 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-09-29 00:52:07 +0200 |
commit | c3f6e72c2d46906a4699127b558ca248729ce577 (patch) | |
tree | a2066e43b45a01226e19bbb309fabf7064018a06 /common/genericresource.cpp | |
parent | b43c422a2b1b899ce5ac27a0bc381e8a49f05d86 (diff) | |
download | sink-c3f6e72c2d46906a4699127b558ca248729ce577.tar.gz sink-c3f6e72c2d46906a4699127b558ca248729ce577.zip |
Revision cleanup
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 31 |
1 files changed, 21 insertions, 10 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 3ffc56b..4abcecd 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -60,7 +60,7 @@ private slots: | |||
60 | }).exec(); | 60 | }).exec(); |
61 | } | 61 | } |
62 | 62 | ||
63 | KAsync::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) | 63 | KAsync::Job<qint64> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) |
64 | { | 64 | { |
65 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | 65 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); |
66 | //Throw command into appropriate pipeline | 66 | //Throw command into appropriate pipeline |
@@ -72,25 +72,27 @@ private slots: | |||
72 | case Akonadi2::Commands::CreateEntityCommand: | 72 | case Akonadi2::Commands::CreateEntityCommand: |
73 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 73 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
74 | default: | 74 | default: |
75 | return KAsync::error<void>(-1, "Unhandled command"); | 75 | return KAsync::error<qint64>(-1, "Unhandled command"); |
76 | } | 76 | } |
77 | return KAsync::null<void>(); | 77 | return KAsync::null<qint64>(); |
78 | } | 78 | } |
79 | 79 | ||
80 | KAsync::Job<void> processQueuedCommand(const QByteArray &data) | 80 | KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) |
81 | { | 81 | { |
82 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); | 82 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); |
83 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | 83 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { |
84 | Warning() << "invalid buffer"; | 84 | Warning() << "invalid buffer"; |
85 | return KAsync::error<void>(1, "Invalid Buffer"); | 85 | // return KAsync::error<void, qint64>(1, "Invalid Buffer"); |
86 | } | 86 | } |
87 | auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); | 87 | auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); |
88 | const auto commandId = queuedCommand->commandId(); | 88 | const auto commandId = queuedCommand->commandId(); |
89 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); | 89 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); |
90 | return processQueuedCommand(queuedCommand).then<void>( | 90 | return processQueuedCommand(queuedCommand).then<qint64, qint64>( |
91 | [commandId]() { | 91 | [commandId](qint64 createdRevision) -> qint64 { |
92 | Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); | 92 | Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); |
93 | }, | 93 | return createdRevision; |
94 | } | ||
95 | , | ||
94 | [](int errorCode, QString errorMessage) { | 96 | [](int errorCode, QString errorMessage) { |
95 | //FIXME propagate error, we didn't handle it | 97 | //FIXME propagate error, we didn't handle it |
96 | Warning() << "Error while processing queue command: " << errorMessage; | 98 | Warning() << "Error while processing queue command: " << errorMessage; |
@@ -106,8 +108,17 @@ private slots: | |||
106 | }).then(KAsync::dowhile( | 108 | }).then(KAsync::dowhile( |
107 | [queue]() { return !queue->isEmpty(); }, | 109 | [queue]() { return !queue->isEmpty(); }, |
108 | [this, queue](KAsync::Future<void> &future) { | 110 | [this, queue](KAsync::Future<void> &future) { |
109 | queue->dequeueBatch(100, [this](const QByteArray &data) { | 111 | const int batchSize = 100; |
110 | return processQueuedCommand(data); | 112 | queue->dequeueBatch(batchSize, [this](const QByteArray &data) { |
113 | return KAsync::start<void>([this, data](KAsync::Future<void> &future) { | ||
114 | processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { | ||
115 | Trace() << "Created revision " << createdRevision; | ||
116 | //We don't have a writeback yet, so we cleanup revisions immediately | ||
117 | //TODO: only cleanup once writeback is done | ||
118 | mPipeline->cleanupRevision(createdRevision); | ||
119 | future.setFinished(); | ||
120 | }).exec(); | ||
121 | }); | ||
111 | } | 122 | } |
112 | ).then<void>([&future, queue](){ | 123 | ).then<void>([&future, queue](){ |
113 | future.setFinished(); | 124 | future.setFinished(); |