diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-03-03 09:01:05 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-03-03 09:01:05 +0100 |
commit | 4d9746c828558c9f872e0aed52442863affb25d5 (patch) | |
tree | 507d7c2ba67f47d3cbbcf01a722236ff1b48426b /common/genericresource.cpp | |
parent | 9cea920b7dd51867a0be0fed2f461b6be73c103e (diff) | |
download | sink-4d9746c828558c9f872e0aed52442863affb25d5.tar.gz sink-4d9746c828558c9f872e0aed52442863affb25d5.zip |
Fromatted the whole codebase with clang-format.
clang-format -i */**{.cpp,.h}
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 378 |
1 files changed, 188 insertions, 190 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 74a8cfb..9c9a12f 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -20,7 +20,7 @@ | |||
20 | #include <QTime> | 20 | #include <QTime> |
21 | 21 | ||
22 | static int sBatchSize = 100; | 22 | static int sBatchSize = 100; |
23 | //This interval directly affects the roundtrip time of single commands | 23 | // This interval directly affects the roundtrip time of single commands |
24 | static int sCommitInterval = 10; | 24 | static int sCommitInterval = 10; |
25 | 25 | ||
26 | using namespace Sink; | 26 | using namespace Sink; |
@@ -39,26 +39,23 @@ class ChangeReplay : public QObject | |||
39 | { | 39 | { |
40 | Q_OBJECT | 40 | Q_OBJECT |
41 | public: | 41 | public: |
42 | |||
43 | typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; | 42 | typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; |
44 | 43 | ||
45 | ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction) | 44 | ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction) |
46 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), | 45 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayFunction(replayFunction) |
47 | mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), | ||
48 | mReplayFunction(replayFunction) | ||
49 | { | 46 | { |
50 | |||
51 | } | 47 | } |
52 | 48 | ||
53 | qint64 getLastReplayedRevision() | 49 | qint64 getLastReplayedRevision() |
54 | { | 50 | { |
55 | qint64 lastReplayedRevision = 0; | 51 | qint64 lastReplayedRevision = 0; |
56 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | 52 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); |
57 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | 53 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
58 | lastReplayedRevision = value.toLongLong(); | 54 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { |
59 | return false; | 55 | lastReplayedRevision = value.toLongLong(); |
60 | }, [](const Storage::Error &) { | 56 | return false; |
61 | }); | 57 | }, |
58 | [](const Storage::Error &) {}); | ||
62 | return lastReplayedRevision; | 59 | return lastReplayedRevision; |
63 | } | 60 | } |
64 | 61 | ||
@@ -79,28 +76,30 @@ public slots: | |||
79 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); | 76 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); |
80 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); | 77 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); |
81 | qint64 lastReplayedRevision = 1; | 78 | qint64 lastReplayedRevision = 1; |
82 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | 79 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
83 | lastReplayedRevision = value.toLongLong(); | 80 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { |
84 | return false; | 81 | lastReplayedRevision = value.toLongLong(); |
85 | }, [](const Storage::Error &) { | 82 | return false; |
86 | }); | 83 | }, |
84 | [](const Storage::Error &) {}); | ||
87 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | 85 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); |
88 | 86 | ||
89 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | 87 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; |
90 | if (lastReplayedRevision <= topRevision) { | 88 | if (lastReplayedRevision <= topRevision) { |
91 | qint64 revision = lastReplayedRevision; | 89 | qint64 revision = lastReplayedRevision; |
92 | for (;revision <= topRevision; revision++) { | 90 | for (; revision <= topRevision; revision++) { |
93 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | 91 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); |
94 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | 92 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); |
95 | const auto key = Storage::assembleKey(uid, revision); | 93 | const auto key = Storage::assembleKey(uid, revision); |
96 | Storage::mainDatabase(mainStoreTransaction, type).scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | 94 | Storage::mainDatabase(mainStoreTransaction, type) |
97 | mReplayFunction(type, key, value).exec(); | 95 | .scan(key, |
98 | //TODO make for loop async, and pass to async replay function together with type | 96 | [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { |
99 | Trace() << "Replaying " << key; | 97 | mReplayFunction(type, key, value).exec(); |
100 | return false; | 98 | // TODO make for loop async, and pass to async replay function together with type |
101 | }, [key](const Storage::Error &) { | 99 | Trace() << "Replaying " << key; |
102 | ErrorMsg() << "Failed to replay change " << key; | 100 | return false; |
103 | }); | 101 | }, |
102 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | ||
104 | } | 103 | } |
105 | revision--; | 104 | revision--; |
106 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | 105 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); |
@@ -126,15 +125,12 @@ class CommandProcessor : public QObject | |||
126 | { | 125 | { |
127 | Q_OBJECT | 126 | Q_OBJECT |
128 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; | 127 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; |
128 | |||
129 | public: | 129 | public: |
130 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue*> commandQueues) | 130 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) |
131 | : QObject(), | ||
132 | mPipeline(pipeline), | ||
133 | mCommandQueues(commandQueues), | ||
134 | mProcessingLock(false) | ||
135 | { | 131 | { |
136 | mPipeline->startTransaction(); | 132 | mPipeline->startTransaction(); |
137 | //FIXME Should be initialized to the current value of the change replay queue | 133 | // FIXME Should be initialized to the current value of the change replay queue |
138 | mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); | 134 | mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); |
139 | mPipeline->commit(); | 135 | mPipeline->commit(); |
140 | 136 | ||
@@ -176,18 +172,20 @@ private slots: | |||
176 | return; | 172 | return; |
177 | } | 173 | } |
178 | mProcessingLock = true; | 174 | mProcessingLock = true; |
179 | auto job = processPipeline().then<void>([this]() { | 175 | auto job = processPipeline() |
180 | mProcessingLock = false; | 176 | .then<void>([this]() { |
181 | if (messagesToProcessAvailable()) { | 177 | mProcessingLock = false; |
182 | process(); | 178 | if (messagesToProcessAvailable()) { |
183 | } | 179 | process(); |
184 | }).exec(); | 180 | } |
181 | }) | ||
182 | .exec(); | ||
185 | } | 183 | } |
186 | 184 | ||
187 | KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand) | 185 | KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand) |
188 | { | 186 | { |
189 | Log() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); | 187 | Log() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); |
190 | //Throw command into appropriate pipeline | 188 | // Throw command into appropriate pipeline |
191 | switch (queuedCommand->commandId()) { | 189 | switch (queuedCommand->commandId()) { |
192 | case Sink::Commands::DeleteEntityCommand: | 190 | case Sink::Commands::DeleteEntityCommand: |
193 | return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 191 | return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
@@ -197,9 +195,7 @@ private slots: | |||
197 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 195 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
198 | case Sink::Commands::InspectionCommand: | 196 | case Sink::Commands::InspectionCommand: |
199 | if (mInspect) { | 197 | if (mInspect) { |
200 | return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { | 198 | return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { return -1; }); |
201 | return -1; | ||
202 | }); | ||
203 | } else { | 199 | } else { |
204 | return KAsync::error<qint64>(-1, "Missing inspection command."); | 200 | return KAsync::error<qint64>(-1, "Missing inspection command."); |
205 | } | 201 | } |
@@ -218,50 +214,47 @@ private slots: | |||
218 | auto queuedCommand = Sink::GetQueuedCommand(data.constData()); | 214 | auto queuedCommand = Sink::GetQueuedCommand(data.constData()); |
219 | const auto commandId = queuedCommand->commandId(); | 215 | const auto commandId = queuedCommand->commandId(); |
220 | Trace() << "Dequeued Command: " << Sink::Commands::name(commandId); | 216 | Trace() << "Dequeued Command: " << Sink::Commands::name(commandId); |
221 | return processQueuedCommand(queuedCommand).then<qint64, qint64>( | 217 | return processQueuedCommand(queuedCommand) |
222 | [commandId](qint64 createdRevision) -> qint64 { | 218 | .then<qint64, qint64>( |
223 | Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId); | 219 | [commandId](qint64 createdRevision) -> qint64 { |
224 | return createdRevision; | 220 | Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId); |
225 | } | 221 | return createdRevision; |
226 | , | 222 | }, |
227 | [](int errorCode, QString errorMessage) { | 223 | [](int errorCode, QString errorMessage) { |
228 | //FIXME propagate error, we didn't handle it | 224 | // FIXME propagate error, we didn't handle it |
229 | Warning() << "Error while processing queue command: " << errorMessage; | 225 | Warning() << "Error while processing queue command: " << errorMessage; |
230 | } | 226 | }); |
231 | ); | ||
232 | } | 227 | } |
233 | 228 | ||
234 | //Process all messages of this queue | 229 | // Process all messages of this queue |
235 | KAsync::Job<void> processQueue(MessageQueue *queue) | 230 | KAsync::Job<void> processQueue(MessageQueue *queue) |
236 | { | 231 | { |
237 | auto time = QSharedPointer<QTime>::create(); | 232 | auto time = QSharedPointer<QTime>::create(); |
238 | return KAsync::start<void>([this](){ | 233 | return KAsync::start<void>([this]() { mPipeline->startTransaction(); }) |
239 | mPipeline->startTransaction(); | 234 | .then(KAsync::dowhile([queue]() { return !queue->isEmpty(); }, |
240 | }).then(KAsync::dowhile( | 235 | [this, queue, time](KAsync::Future<void> &future) { |
241 | [queue]() { return !queue->isEmpty(); }, | 236 | queue->dequeueBatch(sBatchSize, |
242 | [this, queue, time](KAsync::Future<void> &future) { | 237 | [this, time](const QByteArray &data) { |
243 | queue->dequeueBatch(sBatchSize, [this, time](const QByteArray &data) { | 238 | time->start(); |
244 | time->start(); | 239 | return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { |
245 | return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { | 240 | processQueuedCommand(data) |
246 | processQueuedCommand(data).then<void, qint64>([&future, this, time](qint64 createdRevision) { | 241 | .then<void, qint64>([&future, this, time](qint64 createdRevision) { |
247 | Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); | 242 | Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); |
243 | future.setFinished(); | ||
244 | }) | ||
245 | .exec(); | ||
246 | }); | ||
247 | }) | ||
248 | .then<void>([&future, queue]() { future.setFinished(); }, | ||
249 | [&future](int i, QString error) { | ||
250 | if (i != MessageQueue::ErrorCodes::NoMessageFound) { | ||
251 | Warning() << "Error while getting message from messagequeue: " << error; | ||
252 | } | ||
248 | future.setFinished(); | 253 | future.setFinished(); |
249 | }).exec(); | 254 | }) |
250 | }); | 255 | .exec(); |
251 | } | 256 | })) |
252 | ).then<void>([&future, queue](){ | 257 | .then<void>([this]() { mPipeline->commit(); }); |
253 | future.setFinished(); | ||
254 | }, | ||
255 | [&future](int i, QString error) { | ||
256 | if (i != MessageQueue::ErrorCodes::NoMessageFound) { | ||
257 | Warning() << "Error while getting message from messagequeue: " << error; | ||
258 | } | ||
259 | future.setFinished(); | ||
260 | }).exec(); | ||
261 | } | ||
262 | )).then<void>([this]() { | ||
263 | mPipeline->commit(); | ||
264 | }); | ||
265 | } | 258 | } |
266 | 259 | ||
267 | KAsync::Job<void> processPipeline() | 260 | KAsync::Job<void> processPipeline() |
@@ -276,29 +269,29 @@ private slots: | |||
276 | mPipeline->commit(); | 269 | mPipeline->commit(); |
277 | Trace() << "Cleanup done." << Log::TraceTime(time->elapsed()); | 270 | Trace() << "Cleanup done." << Log::TraceTime(time->elapsed()); |
278 | 271 | ||
279 | //Go through all message queues | 272 | // Go through all message queues |
280 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | 273 | auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); |
281 | return KAsync::dowhile( | 274 | return KAsync::dowhile([it]() { return it->hasNext(); }, |
282 | [it]() { return it->hasNext(); }, | ||
283 | [it, this](KAsync::Future<void> &future) { | 275 | [it, this](KAsync::Future<void> &future) { |
284 | auto time = QSharedPointer<QTime>::create(); | 276 | auto time = QSharedPointer<QTime>::create(); |
285 | time->start(); | 277 | time->start(); |
286 | 278 | ||
287 | auto queue = it->next(); | 279 | auto queue = it->next(); |
288 | processQueue(queue).then<void>([&future, time]() { | 280 | processQueue(queue) |
289 | Trace() << "Queue processed." << Log::TraceTime(time->elapsed()); | 281 | .then<void>([&future, time]() { |
290 | future.setFinished(); | 282 | Trace() << "Queue processed." << Log::TraceTime(time->elapsed()); |
291 | }).exec(); | 283 | future.setFinished(); |
292 | } | 284 | }) |
293 | ); | 285 | .exec(); |
286 | }); | ||
294 | } | 287 | } |
295 | 288 | ||
296 | private: | 289 | private: |
297 | Sink::Pipeline *mPipeline; | 290 | Sink::Pipeline *mPipeline; |
298 | //Ordered by priority | 291 | // Ordered by priority |
299 | QList<MessageQueue*> mCommandQueues; | 292 | QList<MessageQueue *> mCommandQueues; |
300 | bool mProcessingLock; | 293 | bool mProcessingLock; |
301 | //The lowest revision we no longer need | 294 | // The lowest revision we no longer need |
302 | qint64 mLowerBoundRevision; | 295 | qint64 mLowerBoundRevision; |
303 | InspectionFunction mInspect; | 296 | InspectionFunction mInspect; |
304 | }; | 297 | }; |
@@ -308,14 +301,14 @@ private: | |||
308 | 301 | ||
309 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) | 302 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) |
310 | : Sink::Resource(), | 303 | : Sink::Resource(), |
311 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 304 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), |
312 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 305 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), |
313 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 306 | mResourceInstanceIdentifier(resourceInstanceIdentifier), |
314 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), | 307 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), |
315 | mError(0), | 308 | mError(0), |
316 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 309 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
317 | { | 310 | { |
318 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | 311 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); |
319 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 312 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
320 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 313 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
321 | if (Sink::Commands::VerifyInspectionBuffer(verifier)) { | 314 | if (Sink::Commands::VerifyInspectionBuffer(verifier)) { |
@@ -330,22 +323,26 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
330 | QDataStream s(expectedValueString); | 323 | QDataStream s(expectedValueString); |
331 | QVariant expectedValue; | 324 | QVariant expectedValue; |
332 | s >> expectedValue; | 325 | s >> expectedValue; |
333 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() { | 326 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) |
334 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; | 327 | .then<void>( |
335 | Sink::Notification n; | 328 | [=]() { |
336 | n.type = Sink::Commands::NotificationType_Inspection; | 329 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; |
337 | n.id = inspectionId; | 330 | Sink::Notification n; |
338 | n.code = Sink::Commands::NotificationCode_Success; | 331 | n.type = Sink::Commands::NotificationType_Inspection; |
339 | emit notify(n); | 332 | n.id = inspectionId; |
340 | }, [=](int code, const QString &message) { | 333 | n.code = Sink::Commands::NotificationCode_Success; |
341 | Log() << "Inspection failed: "<< inspectionType << inspectionId << entityId << message; | 334 | emit notify(n); |
342 | Sink::Notification n; | 335 | }, |
343 | n.type = Sink::Commands::NotificationType_Inspection; | 336 | [=](int code, const QString &message) { |
344 | n.message = message; | 337 | Log() << "Inspection failed: " << inspectionType << inspectionId << entityId << message; |
345 | n.id = inspectionId; | 338 | Sink::Notification n; |
346 | n.code = Sink::Commands::NotificationCode_Failure; | 339 | n.type = Sink::Commands::NotificationType_Inspection; |
347 | emit notify(n); | 340 | n.message = message; |
348 | }).exec(); | 341 | n.id = inspectionId; |
342 | n.code = Sink::Commands::NotificationCode_Failure; | ||
343 | emit notify(n); | ||
344 | }) | ||
345 | .exec(); | ||
349 | return KAsync::null<void>(); | 346 | return KAsync::null<void>(); |
350 | } | 347 | } |
351 | return KAsync::error<void>(-1, "Invalid inspection command."); | 348 | return KAsync::error<void>(-1, "Invalid inspection command."); |
@@ -353,9 +350,9 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
353 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 350 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
354 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 351 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
355 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { | 352 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { |
356 | //This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) | 353 | // This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) |
357 | auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); | 354 | auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); |
358 | return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore](){}); | 355 | return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore]() {}); |
359 | }); | 356 | }); |
360 | enableChangeReplay(true); | 357 | enableChangeReplay(true); |
361 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | 358 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); |
@@ -372,7 +369,8 @@ GenericResource::~GenericResource() | |||
372 | delete mSourceChangeReplay; | 369 | delete mSourceChangeReplay; |
373 | } | 370 | } |
374 | 371 | ||
375 | KAsync::Job<void> GenericResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) | 372 | KAsync::Job<void> GenericResource::inspect( |
373 | int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) | ||
376 | { | 374 | { |
377 | Warning() << "Inspection not implemented"; | 375 | Warning() << "Inspection not implemented"; |
378 | return KAsync::null<void>(); | 376 | return KAsync::null<void>(); |
@@ -390,7 +388,7 @@ void GenericResource::enableChangeReplay(bool enable) | |||
390 | } | 388 | } |
391 | } | 389 | } |
392 | 390 | ||
393 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor*> &preprocessors) | 391 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) |
394 | { | 392 | { |
395 | mPipeline->setPreprocessors(type, preprocessors); | 393 | mPipeline->setPreprocessors(type, preprocessors); |
396 | mPipeline->setAdaptorFactory(type, factory); | 394 | mPipeline->setAdaptorFactory(type, factory); |
@@ -463,14 +461,16 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
463 | { | 461 | { |
464 | return KAsync::start<void>([this]() { | 462 | return KAsync::start<void>([this]() { |
465 | Log() << " Synchronizing"; | 463 | Log() << " Synchronizing"; |
466 | //Changereplay would deadlock otherwise when trying to open the synchronization store | 464 | // Changereplay would deadlock otherwise when trying to open the synchronization store |
467 | enableChangeReplay(false); | 465 | enableChangeReplay(false); |
468 | auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); | 466 | auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); |
469 | auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); | 467 | auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); |
470 | synchronizeWithSource(*mainStore, *syncStore).then<void>([this, mainStore, syncStore]() { | 468 | synchronizeWithSource(*mainStore, *syncStore) |
471 | Log() << "Done Synchronizing"; | 469 | .then<void>([this, mainStore, syncStore]() { |
472 | enableChangeReplay(true); | 470 | Log() << "Done Synchronizing"; |
473 | }).exec(); | 471 | enableChangeReplay(true); |
472 | }) | ||
473 | .exec(); | ||
474 | }); | 474 | }); |
475 | } | 475 | } |
476 | 476 | ||
@@ -484,42 +484,39 @@ static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) | |||
484 | if (queue.isEmpty()) { | 484 | if (queue.isEmpty()) { |
485 | f.setFinished(); | 485 | f.setFinished(); |
486 | } else { | 486 | } else { |
487 | QObject::connect(&queue, &MessageQueue::drained, [&f]() { | 487 | QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); }); |
488 | f.setFinished(); | ||
489 | }); | ||
490 | } | 488 | } |
491 | }; | 489 | }; |
492 | 490 | ||
493 | KAsync::Job<void> GenericResource::processAllMessages() | 491 | KAsync::Job<void> GenericResource::processAllMessages() |
494 | { | 492 | { |
495 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | 493 | // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. |
496 | //TODO: report errors while processing sync? | 494 | // TODO: report errors while processing sync? |
497 | //TODO JOBAPI: A helper that waits for n events and then continues? | 495 | // TODO JOBAPI: A helper that waits for n events and then continues? |
498 | return KAsync::start<void>([this](KAsync::Future<void> &f) { | 496 | return KAsync::start<void>([this](KAsync::Future<void> &f) { |
499 | if (mCommitQueueTimer.isActive()) { | 497 | if (mCommitQueueTimer.isActive()) { |
500 | auto context = new QObject; | 498 | auto context = new QObject; |
501 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { | 499 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { |
502 | delete context; | 500 | delete context; |
501 | f.setFinished(); | ||
502 | }); | ||
503 | } else { | ||
504 | f.setFinished(); | ||
505 | } | ||
506 | }) | ||
507 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) | ||
508 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) | ||
509 | .then<void>([this](KAsync::Future<void> &f) { | ||
510 | if (mSourceChangeReplay->allChangesReplayed()) { | ||
503 | f.setFinished(); | 511 | f.setFinished(); |
504 | }); | 512 | } else { |
505 | } else { | 513 | auto context = new QObject; |
506 | f.setFinished(); | 514 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { |
507 | } | 515 | delete context; |
508 | }).then<void>([this](KAsync::Future<void> &f) { | 516 | f.setFinished(); |
509 | waitForDrained(f, mSynchronizerQueue); | 517 | }); |
510 | }).then<void>([this](KAsync::Future<void> &f) { | 518 | } |
511 | waitForDrained(f, mUserQueue); | 519 | }); |
512 | }).then<void>([this](KAsync::Future<void> &f) { | ||
513 | if (mSourceChangeReplay->allChangesReplayed()) { | ||
514 | f.setFinished(); | ||
515 | } else { | ||
516 | auto context = new QObject; | ||
517 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { | ||
518 | delete context; | ||
519 | f.setFinished(); | ||
520 | }); | ||
521 | } | ||
522 | }); | ||
523 | } | 520 | } |
524 | 521 | ||
525 | void GenericResource::updateLowerBoundRevision() | 522 | void GenericResource::updateLowerBoundRevision() |
@@ -533,14 +530,15 @@ void GenericResource::setLowerBoundRevision(qint64 revision) | |||
533 | updateLowerBoundRevision(); | 530 | updateLowerBoundRevision(); |
534 | } | 531 | } |
535 | 532 | ||
536 | void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 533 | void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, |
534 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
537 | { | 535 | { |
538 | //These changes are coming from the source | 536 | // These changes are coming from the source |
539 | const auto replayToSource = false; | 537 | const auto replayToSource = false; |
540 | flatbuffers::FlatBufferBuilder entityFbb; | 538 | flatbuffers::FlatBufferBuilder entityFbb; |
541 | adaptorFactory.createBuffer(domainObject, entityFbb); | 539 | adaptorFactory.createBuffer(domainObject, entityFbb); |
542 | flatbuffers::FlatBufferBuilder fbb; | 540 | flatbuffers::FlatBufferBuilder fbb; |
543 | //This is the resource type and not the domain type | 541 | // This is the resource type and not the domain type |
544 | auto entityId = fbb.CreateString(sinkId.toStdString()); | 542 | auto entityId = fbb.CreateString(sinkId.toStdString()); |
545 | auto type = fbb.CreateString(bufferType.toStdString()); | 543 | auto type = fbb.CreateString(bufferType.toStdString()); |
546 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | 544 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); |
@@ -549,18 +547,19 @@ void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &b | |||
549 | callback(BufferUtils::extractBuffer(fbb)); | 547 | callback(BufferUtils::extractBuffer(fbb)); |
550 | } | 548 | } |
551 | 549 | ||
552 | void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 550 | void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, |
551 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
553 | { | 552 | { |
554 | //These changes are coming from the source | 553 | // These changes are coming from the source |
555 | const auto replayToSource = false; | 554 | const auto replayToSource = false; |
556 | flatbuffers::FlatBufferBuilder entityFbb; | 555 | flatbuffers::FlatBufferBuilder entityFbb; |
557 | adaptorFactory.createBuffer(domainObject, entityFbb); | 556 | adaptorFactory.createBuffer(domainObject, entityFbb); |
558 | flatbuffers::FlatBufferBuilder fbb; | 557 | flatbuffers::FlatBufferBuilder fbb; |
559 | auto entityId = fbb.CreateString(sinkId.toStdString()); | 558 | auto entityId = fbb.CreateString(sinkId.toStdString()); |
560 | //This is the resource type and not the domain type | 559 | // This is the resource type and not the domain type |
561 | auto type = fbb.CreateString(bufferType.toStdString()); | 560 | auto type = fbb.CreateString(bufferType.toStdString()); |
562 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | 561 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); |
563 | //TODO removals | 562 | // TODO removals |
564 | auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); | 563 | auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); |
565 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); | 564 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); |
566 | callback(BufferUtils::extractBuffer(fbb)); | 565 | callback(BufferUtils::extractBuffer(fbb)); |
@@ -568,11 +567,11 @@ void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, co | |||
568 | 567 | ||
569 | void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | 568 | void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) |
570 | { | 569 | { |
571 | //These changes are coming from the source | 570 | // These changes are coming from the source |
572 | const auto replayToSource = false; | 571 | const auto replayToSource = false; |
573 | flatbuffers::FlatBufferBuilder fbb; | 572 | flatbuffers::FlatBufferBuilder fbb; |
574 | auto entityId = fbb.CreateString(sinkId.toStdString()); | 573 | auto entityId = fbb.CreateString(sinkId.toStdString()); |
575 | //This is the resource type and not the domain type | 574 | // This is the resource type and not the domain type |
576 | auto type = fbb.CreateString(bufferType.toStdString()); | 575 | auto type = fbb.CreateString(bufferType.toStdString()); |
577 | auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); | 576 | auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); |
578 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | 577 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); |
@@ -581,7 +580,8 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co | |||
581 | 580 | ||
582 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | 581 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) |
583 | { | 582 | { |
584 | Index("rid.mapping." + bufferType, transaction).add(remoteId, localId);; | 583 | Index("rid.mapping." + bufferType, transaction).add(remoteId, localId); |
584 | ; | ||
585 | Index("localid.mapping." + bufferType, transaction).add(localId, remoteId); | 585 | Index("localid.mapping." + bufferType, transaction).add(localId, remoteId); |
586 | } | 586 | } |
587 | 587 | ||
@@ -600,7 +600,7 @@ void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteAr | |||
600 | 600 | ||
601 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | 601 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) |
602 | { | 602 | { |
603 | //Lookup local id for remote id, or insert a new pair otherwise | 603 | // Lookup local id for remote id, or insert a new pair otherwise |
604 | Index index("rid.mapping." + bufferType, transaction); | 604 | Index index("rid.mapping." + bufferType, transaction); |
605 | QByteArray sinkId = index.lookup(remoteId); | 605 | QByteArray sinkId = index.lookup(remoteId); |
606 | if (sinkId.isEmpty()) { | 606 | if (sinkId.isEmpty()) { |
@@ -621,19 +621,19 @@ QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const Q | |||
621 | return remoteId; | 621 | return remoteId; |
622 | } | 622 | } |
623 | 623 | ||
624 | void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | 624 | void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, |
625 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | ||
625 | { | 626 | { |
626 | entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { | 627 | entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { |
627 | auto sinkId = Sink::Storage::uidFromKey(key); | 628 | auto sinkId = Sink::Storage::uidFromKey(key); |
628 | Trace() << "Checking for removal " << key; | 629 | Trace() << "Checking for removal " << key; |
629 | const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); | 630 | const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); |
630 | //If we have no remoteId, the entity hasn't been replayed to the source yet | 631 | // If we have no remoteId, the entity hasn't been replayed to the source yet |
631 | if (!remoteId.isEmpty()) { | 632 | if (!remoteId.isEmpty()) { |
632 | if (!exists(remoteId)) { | 633 | if (!exists(remoteId)) { |
633 | Trace() << "Found a removed entity: " << sinkId; | 634 | Trace() << "Found a removed entity: " << sinkId; |
634 | deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) { | 635 | deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, |
635 | enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); | 636 | [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); }); |
636 | }); | ||
637 | } | 637 | } |
638 | } | 638 | } |
639 | }); | 639 | }); |
@@ -642,32 +642,31 @@ void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, S | |||
642 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | 642 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) |
643 | { | 643 | { |
644 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 644 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; |
645 | db.findLatest(uid, [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 645 | db.findLatest(uid, |
646 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 646 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
647 | if (!buffer.isValid()) { | 647 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
648 | Warning() << "Read invalid buffer from disk"; | 648 | if (!buffer.isValid()) { |
649 | } else { | 649 | Warning() << "Read invalid buffer from disk"; |
650 | current = adaptorFactory.createAdaptor(buffer.entity()); | 650 | } else { |
651 | } | 651 | current = adaptorFactory.createAdaptor(buffer.entity()); |
652 | return false; | 652 | } |
653 | }, | 653 | return false; |
654 | [](const Sink::Storage::Error &error) { | 654 | }, |
655 | Warning() << "Failed to read current value from storage: " << error.message; | 655 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); |
656 | }); | ||
657 | return current; | 656 | return current; |
658 | } | 657 | } |
659 | 658 | ||
660 | void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | 659 | void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, |
660 | DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | ||
661 | { | 661 | { |
662 | auto mainDatabase = Storage::mainDatabase(transaction, bufferType); | 662 | auto mainDatabase = Storage::mainDatabase(transaction, bufferType); |
663 | const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); | 663 | const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); |
664 | const auto found = mainDatabase.contains(sinkId); | 664 | const auto found = mainDatabase.contains(sinkId); |
665 | if (!found) { | 665 | if (!found) { |
666 | Trace() << "Found a new entity: " << remoteId; | 666 | Trace() << "Found a new entity: " << remoteId; |
667 | createEntity(sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { | 667 | createEntity( |
668 | enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); | 668 | sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); }); |
669 | }); | 669 | } else { // modification |
670 | } else { //modification | ||
671 | if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { | 670 | if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { |
672 | bool changed = false; | 671 | bool changed = false; |
673 | for (const auto &property : entity.changedProperties()) { | 672 | for (const auto &property : entity.changedProperties()) { |
@@ -678,9 +677,8 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si | |||
678 | } | 677 | } |
679 | if (changed) { | 678 | if (changed) { |
680 | Trace() << "Found a modified entity: " << remoteId; | 679 | Trace() << "Found a modified entity: " << remoteId; |
681 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { | 680 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, |
682 | enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); | 681 | [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); }); |
683 | }); | ||
684 | } | 682 | } |
685 | } else { | 683 | } else { |
686 | Warning() << "Failed to get current entity"; | 684 | Warning() << "Failed to get current entity"; |