diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/changereplay.cpp | 2 | ||||
-rw-r--r-- | common/genericresource.cpp | 31 | ||||
-rw-r--r-- | common/genericresource.h | 7 | ||||
-rw-r--r-- | common/sourcewriteback.cpp | 30 | ||||
-rw-r--r-- | common/storage.h | 5 | ||||
-rw-r--r-- | common/storage_lmdb.cpp | 21 | ||||
-rw-r--r-- | common/synchronizer.cpp | 43 | ||||
-rw-r--r-- | common/synchronizer.h | 17 |
8 files changed, 106 insertions, 50 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 2447b6e..63c41c8 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -85,9 +85,9 @@ void ChangeReplay::revisionChanged() | |||
85 | Storage::mainDatabase(mainStoreTransaction, type) | 85 | Storage::mainDatabase(mainStoreTransaction, type) |
86 | .scan(key, | 86 | .scan(key, |
87 | [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | 87 | [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { |
88 | Trace() << "Replaying " << key; | ||
88 | replay(type, key, value).exec(); | 89 | replay(type, key, value).exec(); |
89 | // TODO make for loop async, and pass to async replay function together with type | 90 | // TODO make for loop async, and pass to async replay function together with type |
90 | Trace() << "Replaying " << key; | ||
91 | return false; | 91 | return false; |
92 | }, | 92 | }, |
93 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | 93 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 568e066..cd3ea02 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -233,22 +233,17 @@ private: | |||
233 | #undef DEBUG_AREA | 233 | #undef DEBUG_AREA |
234 | #define DEBUG_AREA "resource" | 234 | #define DEBUG_AREA "resource" |
235 | 235 | ||
236 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer) | 236 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) |
237 | : Sink::Resource(), | 237 | : Sink::Resource(), |
238 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 238 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), |
239 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 239 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), |
240 | mResourceType(resourceType), | 240 | mResourceType(resourceType), |
241 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 241 | mResourceInstanceIdentifier(resourceInstanceIdentifier), |
242 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), | 242 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), |
243 | mChangeReplay(changeReplay), | ||
244 | mSynchronizer(synchronizer), | ||
245 | mError(0), | 243 | mError(0), |
246 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 244 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
247 | { | 245 | { |
248 | mPipeline->setResourceType(mResourceType); | 246 | mPipeline->setResourceType(mResourceType); |
249 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
250 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
251 | }); | ||
252 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); | 247 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); |
253 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 248 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
254 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 249 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
@@ -290,9 +285,7 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra | |||
290 | }); | 285 | }); |
291 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 286 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
292 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 287 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
293 | enableChangeReplay(true); | ||
294 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | 288 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); |
295 | mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); | ||
296 | 289 | ||
297 | mCommitQueueTimer.setInterval(sCommitInterval); | 290 | mCommitQueueTimer.setInterval(sCommitInterval); |
298 | mCommitQueueTimer.setSingleShot(true); | 291 | mCommitQueueTimer.setSingleShot(true); |
@@ -313,6 +306,7 @@ KAsync::Job<void> GenericResource::inspect( | |||
313 | 306 | ||
314 | void GenericResource::enableChangeReplay(bool enable) | 307 | void GenericResource::enableChangeReplay(bool enable) |
315 | { | 308 | { |
309 | Q_ASSERT(mChangeReplay); | ||
316 | if (enable) { | 310 | if (enable) { |
317 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); | 311 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
318 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 312 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
@@ -323,11 +317,25 @@ void GenericResource::enableChangeReplay(bool enable) | |||
323 | } | 317 | } |
324 | } | 318 | } |
325 | 319 | ||
326 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) | 320 | void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) |
327 | { | 321 | { |
328 | mPipeline->setPreprocessors(type, preprocessors); | 322 | mPipeline->setPreprocessors(type, preprocessors); |
329 | } | 323 | } |
330 | 324 | ||
325 | void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) | ||
326 | { | ||
327 | mSynchronizer = synchronizer; | ||
328 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
329 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
330 | }); | ||
331 | } | ||
332 | |||
333 | void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay) | ||
334 | { | ||
335 | mChangeReplay = changeReplay; | ||
336 | mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); | ||
337 | enableChangeReplay(true); | ||
338 | } | ||
331 | 339 | ||
332 | void GenericResource::removeDataFromDisk() | 340 | void GenericResource::removeDataFromDisk() |
333 | { | 341 | { |
@@ -406,11 +414,6 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
406 | }); | 414 | }); |
407 | } | 415 | } |
408 | 416 | ||
409 | KAsync::Job<void> GenericResource::synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore) | ||
410 | { | ||
411 | return KAsync::null<void>(); | ||
412 | } | ||
413 | |||
414 | static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) | 417 | static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) |
415 | { | 418 | { |
416 | if (queue.isEmpty()) { | 419 | if (queue.isEmpty()) { |
diff --git a/common/genericresource.h b/common/genericresource.h index 4ed408d..0878968 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -41,12 +41,11 @@ class Synchronizer; | |||
41 | class SINK_EXPORT GenericResource : public Resource | 41 | class SINK_EXPORT GenericResource : public Resource |
42 | { | 42 | { |
43 | public: | 43 | public: |
44 | GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer); | 44 | GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline); |
45 | virtual ~GenericResource(); | 45 | virtual ~GenericResource(); |
46 | 46 | ||
47 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; | 47 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; |
48 | virtual KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE; | 48 | virtual KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE; |
49 | virtual KAsync::Job<void> synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore); | ||
50 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; | 49 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; |
51 | virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; | 50 | virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; |
52 | virtual KAsync::Job<void> | 51 | virtual KAsync::Job<void> |
@@ -64,7 +63,9 @@ private slots: | |||
64 | protected: | 63 | protected: |
65 | void enableChangeReplay(bool); | 64 | void enableChangeReplay(bool); |
66 | 65 | ||
67 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); | 66 | void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors); |
67 | void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); | ||
68 | void setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay); | ||
68 | 69 | ||
69 | void onProcessorError(int errorCode, const QString &errorMessage); | 70 | void onProcessorError(int errorCode, const QString &errorMessage); |
70 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | 71 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); |
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp index 1ef20d2..1c07577 100644 --- a/common/sourcewriteback.cpp +++ b/common/sourcewriteback.cpp | |||
@@ -54,8 +54,7 @@ RemoteIdMap &SourceWriteBack::syncStore() | |||
54 | 54 | ||
55 | KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | 55 | KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) |
56 | { | 56 | { |
57 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | 57 | Trace() << "Replaying" << type << key; |
58 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
59 | 58 | ||
60 | Sink::EntityBuffer buffer(value); | 59 | Sink::EntityBuffer buffer(value); |
61 | const Sink::Entity &entity = buffer.entity(); | 60 | const Sink::Entity &entity = buffer.entity(); |
@@ -65,7 +64,14 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr | |||
65 | Trace() << "Change is coming from the source"; | 64 | Trace() << "Change is coming from the source"; |
66 | return KAsync::null<void>(); | 65 | return KAsync::null<void>(); |
67 | } | 66 | } |
68 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | 67 | Q_ASSERT(!mSyncStore); |
68 | Q_ASSERT(!mEntityStore); | ||
69 | Q_ASSERT(!mTransaction); | ||
70 | Q_ASSERT(!mSyncTransaction); | ||
71 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
72 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
73 | |||
74 | // const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
69 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | 75 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; |
70 | const auto uid = Sink::Storage::uidFromKey(key); | 76 | const auto uid = Sink::Storage::uidFromKey(key); |
71 | QByteArray oldRemoteId; | 77 | QByteArray oldRemoteId; |
@@ -84,32 +90,38 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr | |||
84 | job = replay(mail, operation, oldRemoteId); | 90 | job = replay(mail, operation, oldRemoteId); |
85 | } | 91 | } |
86 | 92 | ||
87 | return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) { | 93 | return job.then<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { |
88 | Trace() << "Replayed change with remote id: " << remoteId; | ||
89 | if (operation == Sink::Operation_Creation) { | 94 | if (operation == Sink::Operation_Creation) { |
95 | Trace() << "Replayed creation with remote id: " << remoteId; | ||
90 | if (remoteId.isEmpty()) { | 96 | if (remoteId.isEmpty()) { |
91 | Warning() << "Returned an empty remoteId from the creation"; | 97 | Warning() << "Returned an empty remoteId from the creation"; |
92 | } else { | 98 | } else { |
93 | syncStore().recordRemoteId(type, uid, remoteId); | 99 | syncStore().recordRemoteId(type, uid, remoteId); |
94 | } | 100 | } |
95 | } else if (operation == Sink::Operation_Modification) { | 101 | } else if (operation == Sink::Operation_Modification) { |
102 | Trace() << "Replayed modification with remote id: " << remoteId; | ||
96 | if (remoteId.isEmpty()) { | 103 | if (remoteId.isEmpty()) { |
97 | Warning() << "Returned an empty remoteId from the creation"; | 104 | Warning() << "Returned an empty remoteId from the creation"; |
98 | } else { | 105 | } else { |
99 | syncStore().updateRemoteId(type, uid, remoteId); | 106 | syncStore().updateRemoteId(type, uid, remoteId); |
100 | } | 107 | } |
101 | } else if (operation == Sink::Operation_Removal) { | 108 | } else if (operation == Sink::Operation_Removal) { |
102 | syncStore().removeRemoteId(type, uid, remoteId); | 109 | Trace() << "Replayed removal with remote id: " << oldRemoteId; |
110 | syncStore().removeRemoteId(type, uid, oldRemoteId); | ||
103 | } else { | 111 | } else { |
104 | Warning() << "Unkown operation" << operation; | 112 | ErrorMsg() << "Unkown operation" << operation; |
105 | } | 113 | } |
106 | 114 | ||
115 | mSyncStore.clear(); | ||
116 | mEntityStore.clear(); | ||
107 | mTransaction.abort(); | 117 | mTransaction.abort(); |
108 | mSyncTransaction.commit(); | 118 | mSyncTransaction.commit(); |
119 | }, [this](int errorCode, const QString &errorMessage) { | ||
120 | Warning() << "Failed to replay change: " << errorMessage; | ||
109 | mSyncStore.clear(); | 121 | mSyncStore.clear(); |
110 | mEntityStore.clear(); | 122 | mEntityStore.clear(); |
111 | }, [](int errorCode, const QString &errorMessage) { | 123 | mTransaction.abort(); |
112 | Warning() << "Failed to replay change: " << errorMessage; | 124 | mSyncTransaction.commit(); |
113 | }); | 125 | }); |
114 | } | 126 | } |
115 | 127 | ||
diff --git a/common/storage.h b/common/storage.h index b051daa..87573e2 100644 --- a/common/storage.h +++ b/common/storage.h | |||
@@ -157,10 +157,7 @@ public: | |||
157 | return *this; | 157 | return *this; |
158 | } | 158 | } |
159 | 159 | ||
160 | operator bool() const | 160 | operator bool() const; |
161 | { | ||
162 | return (d != nullptr); | ||
163 | } | ||
164 | 161 | ||
165 | private: | 162 | private: |
166 | Transaction(Transaction &other); | 163 | Transaction(Transaction &other); |
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 4ed9525..cc8b28d 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp | |||
@@ -347,7 +347,7 @@ class Storage::Transaction::Private | |||
347 | { | 347 | { |
348 | public: | 348 | public: |
349 | Private(bool _requestRead, const std::function<void(const Storage::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_env *_env) | 349 | Private(bool _requestRead, const std::function<void(const Storage::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_env *_env) |
350 | : env(_env), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0) | 350 | : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0) |
351 | { | 351 | { |
352 | } | 352 | } |
353 | ~Private() | 353 | ~Private() |
@@ -366,8 +366,15 @@ public: | |||
366 | 366 | ||
367 | void startTransaction() | 367 | void startTransaction() |
368 | { | 368 | { |
369 | // qDebug() << "Opening transaction " << requestedRead; | 369 | Q_ASSERT(!transaction); |
370 | // auto f = [](const char *msg, void *ctx) -> int { | ||
371 | // qDebug() << msg; | ||
372 | // return 0; | ||
373 | // }; | ||
374 | // mdb_reader_list(env, f, nullptr); | ||
375 | // Trace_area("storage." + name.toLatin1()) << "Opening transaction " << requestedRead; | ||
370 | const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); | 376 | const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); |
377 | // Trace_area("storage." + name.toLatin1()) << "Started transaction " << mdb_txn_id(transaction) << transaction; | ||
371 | if (rc) { | 378 | if (rc) { |
372 | defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); | 379 | defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); |
373 | } | 380 | } |
@@ -387,22 +394,27 @@ Storage::Transaction::~Transaction() | |||
387 | { | 394 | { |
388 | if (d && d->transaction) { | 395 | if (d && d->transaction) { |
389 | if (d->implicitCommit && !d->error) { | 396 | if (d->implicitCommit && !d->error) { |
390 | // qDebug() << "implicit commit"; | ||
391 | commit(); | 397 | commit(); |
392 | } else { | 398 | } else { |
393 | // qDebug() << "Aorting transaction"; | 399 | // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; |
394 | mdb_txn_abort(d->transaction); | 400 | mdb_txn_abort(d->transaction); |
395 | } | 401 | } |
396 | } | 402 | } |
397 | delete d; | 403 | delete d; |
398 | } | 404 | } |
399 | 405 | ||
406 | Storage::Transaction::operator bool() const | ||
407 | { | ||
408 | return (d && d->transaction); | ||
409 | } | ||
410 | |||
400 | bool Storage::Transaction::commit(const std::function<void(const Storage::Error &error)> &errorHandler) | 411 | bool Storage::Transaction::commit(const std::function<void(const Storage::Error &error)> &errorHandler) |
401 | { | 412 | { |
402 | if (!d || !d->transaction) { | 413 | if (!d || !d->transaction) { |
403 | return false; | 414 | return false; |
404 | } | 415 | } |
405 | 416 | ||
417 | // Trace_area("storage." + d->name.toLatin1()) << "Committing transaction" << mdb_txn_id(d->transaction) << d->transaction; | ||
406 | const int rc = mdb_txn_commit(d->transaction); | 418 | const int rc = mdb_txn_commit(d->transaction); |
407 | if (rc) { | 419 | if (rc) { |
408 | mdb_txn_abort(d->transaction); | 420 | mdb_txn_abort(d->transaction); |
@@ -420,6 +432,7 @@ void Storage::Transaction::abort() | |||
420 | return; | 432 | return; |
421 | } | 433 | } |
422 | 434 | ||
435 | // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; | ||
423 | mdb_txn_abort(d->transaction); | 436 | mdb_txn_abort(d->transaction); |
424 | d->transaction = nullptr; | 437 | d->transaction = nullptr; |
425 | } | 438 | } |
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index fb0baaa..b264662 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -54,7 +54,7 @@ void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) | |||
54 | EntityStore &Synchronizer::store() | 54 | EntityStore &Synchronizer::store() |
55 | { | 55 | { |
56 | if (!mEntityStore) { | 56 | if (!mEntityStore) { |
57 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | 57 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, transaction()); |
58 | } | 58 | } |
59 | return *mEntityStore; | 59 | return *mEntityStore; |
60 | } | 60 | } |
@@ -62,7 +62,7 @@ EntityStore &Synchronizer::store() | |||
62 | RemoteIdMap &Synchronizer::syncStore() | 62 | RemoteIdMap &Synchronizer::syncStore() |
63 | { | 63 | { |
64 | if (!mSyncStore) { | 64 | if (!mSyncStore) { |
65 | mSyncStore = QSharedPointer<RemoteIdMap>::create(mSyncTransaction); | 65 | mSyncStore = QSharedPointer<RemoteIdMap>::create(syncTransaction()); |
66 | } | 66 | } |
67 | return *mSyncStore; | 67 | return *mSyncStore; |
68 | } | 68 | } |
@@ -125,7 +125,7 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func | |||
125 | if (!remoteId.isEmpty()) { | 125 | if (!remoteId.isEmpty()) { |
126 | if (!exists(remoteId)) { | 126 | if (!exists(remoteId)) { |
127 | Trace() << "Found a removed entity: " << sinkId; | 127 | Trace() << "Found a removed entity: " << sinkId; |
128 | deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, | 128 | deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, |
129 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); | 129 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); |
130 | } | 130 | } |
131 | } | 131 | } |
@@ -135,7 +135,7 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func | |||
135 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | 135 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) |
136 | { | 136 | { |
137 | Trace() << "Create or modify" << bufferType << remoteId; | 137 | Trace() << "Create or modify" << bufferType << remoteId; |
138 | auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); | 138 | auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); |
139 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | 139 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); |
140 | const auto found = mainDatabase.contains(sinkId); | 140 | const auto found = mainDatabase.contains(sinkId); |
141 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | 141 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); |
@@ -154,7 +154,7 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
154 | } | 154 | } |
155 | if (changed) { | 155 | if (changed) { |
156 | Trace() << "Found a modified entity: " << remoteId; | 156 | Trace() << "Found a modified entity: " << remoteId; |
157 | modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, | 157 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, |
158 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); | 158 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); |
159 | } | 159 | } |
160 | } else { | 160 | } else { |
@@ -165,12 +165,37 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
165 | 165 | ||
166 | KAsync::Job<void> Synchronizer::synchronize() | 166 | KAsync::Job<void> Synchronizer::synchronize() |
167 | { | 167 | { |
168 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | 168 | Trace() << "Synchronizing"; |
169 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
170 | return synchronizeWithSource().then<void>([this]() { | 169 | return synchronizeWithSource().then<void>([this]() { |
171 | mTransaction.abort(); | ||
172 | mSyncTransaction.commit(); | ||
173 | mSyncStore.clear(); | 170 | mSyncStore.clear(); |
174 | mEntityStore.clear(); | 171 | mEntityStore.clear(); |
175 | }); | 172 | }); |
176 | } | 173 | } |
174 | |||
175 | void Synchronizer::commit() | ||
176 | { | ||
177 | mTransaction.abort(); | ||
178 | } | ||
179 | |||
180 | void Synchronizer::commitSync() | ||
181 | { | ||
182 | mSyncTransaction.commit(); | ||
183 | } | ||
184 | |||
185 | Sink::Storage::Transaction &Synchronizer::transaction() | ||
186 | { | ||
187 | if (!mTransaction) { | ||
188 | Trace() << "Starting transaction"; | ||
189 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
190 | } | ||
191 | return mTransaction; | ||
192 | } | ||
193 | |||
194 | Sink::Storage::Transaction &Synchronizer::syncTransaction() | ||
195 | { | ||
196 | if (!mSyncTransaction) { | ||
197 | Trace() << "Starting transaction"; | ||
198 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
199 | } | ||
200 | return mSyncTransaction; | ||
201 | } | ||
diff --git a/common/synchronizer.h b/common/synchronizer.h index 61bca7d..17e7003 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h | |||
@@ -41,6 +41,17 @@ public: | |||
41 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback); | 41 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback); |
42 | KAsync::Job<void> synchronize(); | 42 | KAsync::Job<void> synchronize(); |
43 | 43 | ||
44 | //Read only access to main storage | ||
45 | EntityStore &store(); | ||
46 | |||
47 | //Read/Write access to sync storage | ||
48 | RemoteIdMap &syncStore(); | ||
49 | |||
50 | void commit(); | ||
51 | void commitSync(); | ||
52 | Sink::Storage::Transaction &transaction(); | ||
53 | Sink::Storage::Transaction &syncTransaction(); | ||
54 | |||
44 | protected: | 55 | protected: |
45 | ///Calls the callback to enqueue the command | 56 | ///Calls the callback to enqueue the command |
46 | void enqueueCommand(int commandId, const QByteArray &data); | 57 | void enqueueCommand(int commandId, const QByteArray &data); |
@@ -71,12 +82,6 @@ protected: | |||
71 | */ | 82 | */ |
72 | void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); | 83 | void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); |
73 | 84 | ||
74 | //Read only access to main storage | ||
75 | EntityStore &store(); | ||
76 | |||
77 | //Read/Write access to sync storage | ||
78 | RemoteIdMap &syncStore(); | ||
79 | |||
80 | virtual KAsync::Job<void> synchronizeWithSource() = 0; | 85 | virtual KAsync::Job<void> synchronizeWithSource() = 0; |
81 | 86 | ||
82 | private: | 87 | private: |