summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-29 15:19:21 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-29 15:19:21 +0200
commitdabd408dcd372f16c7934597db30346869cd8ad8 (patch)
tree0d6513204b7fa6e34cf50733ad0472866ea05e2a
parentb441386c4e138d19bbd79d578e0a2ff1b3f54a93 (diff)
downloadsink-dabd408dcd372f16c7934597db30346869cd8ad8.tar.gz
sink-dabd408dcd372f16c7934597db30346869cd8ad8.zip
Fixed genericresource so it works with the maildirresourcetest
-rw-r--r--common/changereplay.cpp2
-rw-r--r--common/genericresource.cpp31
-rw-r--r--common/genericresource.h7
-rw-r--r--common/sourcewriteback.cpp30
-rw-r--r--common/storage.h5
-rw-r--r--common/storage_lmdb.cpp21
-rw-r--r--common/synchronizer.cpp43
-rw-r--r--common/synchronizer.h17
-rw-r--r--examples/dummyresource/resourcefactory.cpp13
-rw-r--r--examples/dummyresource/resourcefactory.h4
-rw-r--r--tests/testimplementations.h2
11 files changed, 113 insertions, 62 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 2447b6e..63c41c8 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -85,9 +85,9 @@ void ChangeReplay::revisionChanged()
85 Storage::mainDatabase(mainStoreTransaction, type) 85 Storage::mainDatabase(mainStoreTransaction, type)
86 .scan(key, 86 .scan(key,
87 [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { 87 [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool {
88 Trace() << "Replaying " << key;
88 replay(type, key, value).exec(); 89 replay(type, key, value).exec();
89 // TODO make for loop async, and pass to async replay function together with type 90 // TODO make for loop async, and pass to async replay function together with type
90 Trace() << "Replaying " << key;
91 return false; 91 return false;
92 }, 92 },
93 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); 93 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; });
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 568e066..cd3ea02 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -233,22 +233,17 @@ private:
233#undef DEBUG_AREA 233#undef DEBUG_AREA
234#define DEBUG_AREA "resource" 234#define DEBUG_AREA "resource"
235 235
236GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer) 236GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline )
237 : Sink::Resource(), 237 : Sink::Resource(),
238 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 238 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"),
239 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), 239 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"),
240 mResourceType(resourceType), 240 mResourceType(resourceType),
241 mResourceInstanceIdentifier(resourceInstanceIdentifier), 241 mResourceInstanceIdentifier(resourceInstanceIdentifier),
242 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), 242 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)),
243 mChangeReplay(changeReplay),
244 mSynchronizer(synchronizer),
245 mError(0), 243 mError(0),
246 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 244 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
247{ 245{
248 mPipeline->setResourceType(mResourceType); 246 mPipeline->setResourceType(mResourceType);
249 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
250 enqueueCommand(mSynchronizerQueue, commandId, data);
251 });
252 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); 247 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue);
253 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 248 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
254 flatbuffers::Verifier verifier((const uint8_t *)command, size); 249 flatbuffers::Verifier verifier((const uint8_t *)command, size);
@@ -290,9 +285,7 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
290 }); 285 });
291 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 286 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
292 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 287 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
293 enableChangeReplay(true);
294 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 288 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
295 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision());
296 289
297 mCommitQueueTimer.setInterval(sCommitInterval); 290 mCommitQueueTimer.setInterval(sCommitInterval);
298 mCommitQueueTimer.setSingleShot(true); 291 mCommitQueueTimer.setSingleShot(true);
@@ -313,6 +306,7 @@ KAsync::Job<void> GenericResource::inspect(
313 306
314void GenericResource::enableChangeReplay(bool enable) 307void GenericResource::enableChangeReplay(bool enable)
315{ 308{
309 Q_ASSERT(mChangeReplay);
316 if (enable) { 310 if (enable) {
317 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); 311 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
318 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 312 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
@@ -323,11 +317,25 @@ void GenericResource::enableChangeReplay(bool enable)
323 } 317 }
324} 318}
325 319
326void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) 320void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors)
327{ 321{
328 mPipeline->setPreprocessors(type, preprocessors); 322 mPipeline->setPreprocessors(type, preprocessors);
329} 323}
330 324
325void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer)
326{
327 mSynchronizer = synchronizer;
328 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
329 enqueueCommand(mSynchronizerQueue, commandId, data);
330 });
331}
332
333void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay)
334{
335 mChangeReplay = changeReplay;
336 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision());
337 enableChangeReplay(true);
338}
331 339
332void GenericResource::removeDataFromDisk() 340void GenericResource::removeDataFromDisk()
333{ 341{
@@ -406,11 +414,6 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
406 }); 414 });
407} 415}
408 416
409KAsync::Job<void> GenericResource::synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore)
410{
411 return KAsync::null<void>();
412}
413
414static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) 417static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue)
415{ 418{
416 if (queue.isEmpty()) { 419 if (queue.isEmpty()) {
diff --git a/common/genericresource.h b/common/genericresource.h
index 4ed408d..0878968 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -41,12 +41,11 @@ class Synchronizer;
41class SINK_EXPORT GenericResource : public Resource 41class SINK_EXPORT GenericResource : public Resource
42{ 42{
43public: 43public:
44 GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer); 44 GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline);
45 virtual ~GenericResource(); 45 virtual ~GenericResource();
46 46
47 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; 47 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE;
48 virtual KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE; 48 virtual KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE;
49 virtual KAsync::Job<void> synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore);
50 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; 49 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
51 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; 50 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE;
52 virtual KAsync::Job<void> 51 virtual KAsync::Job<void>
@@ -64,7 +63,9 @@ private slots:
64protected: 63protected:
65 void enableChangeReplay(bool); 64 void enableChangeReplay(bool);
66 65
67 void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); 66 void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors);
67 void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer);
68 void setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay);
68 69
69 void onProcessorError(int errorCode, const QString &errorMessage); 70 void onProcessorError(int errorCode, const QString &errorMessage);
70 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); 71 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp
index 1ef20d2..1c07577 100644
--- a/common/sourcewriteback.cpp
+++ b/common/sourcewriteback.cpp
@@ -54,8 +54,7 @@ RemoteIdMap &SourceWriteBack::syncStore()
54 54
55KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) 55KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
56{ 56{
57 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); 57 Trace() << "Replaying" << type << key;
58 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
59 58
60 Sink::EntityBuffer buffer(value); 59 Sink::EntityBuffer buffer(value);
61 const Sink::Entity &entity = buffer.entity(); 60 const Sink::Entity &entity = buffer.entity();
@@ -65,7 +64,14 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr
65 Trace() << "Change is coming from the source"; 64 Trace() << "Change is coming from the source";
66 return KAsync::null<void>(); 65 return KAsync::null<void>();
67 } 66 }
68 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; 67 Q_ASSERT(!mSyncStore);
68 Q_ASSERT(!mEntityStore);
69 Q_ASSERT(!mTransaction);
70 Q_ASSERT(!mSyncTransaction);
71 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
72 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
73
74 // const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
69 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; 75 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
70 const auto uid = Sink::Storage::uidFromKey(key); 76 const auto uid = Sink::Storage::uidFromKey(key);
71 QByteArray oldRemoteId; 77 QByteArray oldRemoteId;
@@ -84,32 +90,38 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr
84 job = replay(mail, operation, oldRemoteId); 90 job = replay(mail, operation, oldRemoteId);
85 } 91 }
86 92
87 return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) { 93 return job.then<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) {
88 Trace() << "Replayed change with remote id: " << remoteId;
89 if (operation == Sink::Operation_Creation) { 94 if (operation == Sink::Operation_Creation) {
95 Trace() << "Replayed creation with remote id: " << remoteId;
90 if (remoteId.isEmpty()) { 96 if (remoteId.isEmpty()) {
91 Warning() << "Returned an empty remoteId from the creation"; 97 Warning() << "Returned an empty remoteId from the creation";
92 } else { 98 } else {
93 syncStore().recordRemoteId(type, uid, remoteId); 99 syncStore().recordRemoteId(type, uid, remoteId);
94 } 100 }
95 } else if (operation == Sink::Operation_Modification) { 101 } else if (operation == Sink::Operation_Modification) {
102 Trace() << "Replayed modification with remote id: " << remoteId;
96 if (remoteId.isEmpty()) { 103 if (remoteId.isEmpty()) {
97 Warning() << "Returned an empty remoteId from the creation"; 104 Warning() << "Returned an empty remoteId from the creation";
98 } else { 105 } else {
99 syncStore().updateRemoteId(type, uid, remoteId); 106 syncStore().updateRemoteId(type, uid, remoteId);
100 } 107 }
101 } else if (operation == Sink::Operation_Removal) { 108 } else if (operation == Sink::Operation_Removal) {
102 syncStore().removeRemoteId(type, uid, remoteId); 109 Trace() << "Replayed removal with remote id: " << oldRemoteId;
110 syncStore().removeRemoteId(type, uid, oldRemoteId);
103 } else { 111 } else {
104 Warning() << "Unkown operation" << operation; 112 ErrorMsg() << "Unkown operation" << operation;
105 } 113 }
106 114
115 mSyncStore.clear();
116 mEntityStore.clear();
107 mTransaction.abort(); 117 mTransaction.abort();
108 mSyncTransaction.commit(); 118 mSyncTransaction.commit();
119 }, [this](int errorCode, const QString &errorMessage) {
120 Warning() << "Failed to replay change: " << errorMessage;
109 mSyncStore.clear(); 121 mSyncStore.clear();
110 mEntityStore.clear(); 122 mEntityStore.clear();
111 }, [](int errorCode, const QString &errorMessage) { 123 mTransaction.abort();
112 Warning() << "Failed to replay change: " << errorMessage; 124 mSyncTransaction.commit();
113 }); 125 });
114} 126}
115 127
diff --git a/common/storage.h b/common/storage.h
index b051daa..87573e2 100644
--- a/common/storage.h
+++ b/common/storage.h
@@ -157,10 +157,7 @@ public:
157 return *this; 157 return *this;
158 } 158 }
159 159
160 operator bool() const 160 operator bool() const;
161 {
162 return (d != nullptr);
163 }
164 161
165 private: 162 private:
166 Transaction(Transaction &other); 163 Transaction(Transaction &other);
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp
index 4ed9525..cc8b28d 100644
--- a/common/storage_lmdb.cpp
+++ b/common/storage_lmdb.cpp
@@ -347,7 +347,7 @@ class Storage::Transaction::Private
347{ 347{
348public: 348public:
349 Private(bool _requestRead, const std::function<void(const Storage::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_env *_env) 349 Private(bool _requestRead, const std::function<void(const Storage::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_env *_env)
350 : env(_env), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0) 350 : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0)
351 { 351 {
352 } 352 }
353 ~Private() 353 ~Private()
@@ -366,8 +366,15 @@ public:
366 366
367 void startTransaction() 367 void startTransaction()
368 { 368 {
369 // qDebug() << "Opening transaction " << requestedRead; 369 Q_ASSERT(!transaction);
370 // auto f = [](const char *msg, void *ctx) -> int {
371 // qDebug() << msg;
372 // return 0;
373 // };
374 // mdb_reader_list(env, f, nullptr);
375 // Trace_area("storage." + name.toLatin1()) << "Opening transaction " << requestedRead;
370 const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); 376 const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction);
377 // Trace_area("storage." + name.toLatin1()) << "Started transaction " << mdb_txn_id(transaction) << transaction;
371 if (rc) { 378 if (rc) {
372 defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); 379 defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc))));
373 } 380 }
@@ -387,22 +394,27 @@ Storage::Transaction::~Transaction()
387{ 394{
388 if (d && d->transaction) { 395 if (d && d->transaction) {
389 if (d->implicitCommit && !d->error) { 396 if (d->implicitCommit && !d->error) {
390 // qDebug() << "implicit commit";
391 commit(); 397 commit();
392 } else { 398 } else {
393 // qDebug() << "Aorting transaction"; 399 // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction;
394 mdb_txn_abort(d->transaction); 400 mdb_txn_abort(d->transaction);
395 } 401 }
396 } 402 }
397 delete d; 403 delete d;
398} 404}
399 405
406Storage::Transaction::operator bool() const
407{
408 return (d && d->transaction);
409}
410
400bool Storage::Transaction::commit(const std::function<void(const Storage::Error &error)> &errorHandler) 411bool Storage::Transaction::commit(const std::function<void(const Storage::Error &error)> &errorHandler)
401{ 412{
402 if (!d || !d->transaction) { 413 if (!d || !d->transaction) {
403 return false; 414 return false;
404 } 415 }
405 416
417 // Trace_area("storage." + d->name.toLatin1()) << "Committing transaction" << mdb_txn_id(d->transaction) << d->transaction;
406 const int rc = mdb_txn_commit(d->transaction); 418 const int rc = mdb_txn_commit(d->transaction);
407 if (rc) { 419 if (rc) {
408 mdb_txn_abort(d->transaction); 420 mdb_txn_abort(d->transaction);
@@ -420,6 +432,7 @@ void Storage::Transaction::abort()
420 return; 432 return;
421 } 433 }
422 434
435 // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction;
423 mdb_txn_abort(d->transaction); 436 mdb_txn_abort(d->transaction);
424 d->transaction = nullptr; 437 d->transaction = nullptr;
425} 438}
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index fb0baaa..b264662 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -54,7 +54,7 @@ void Synchronizer::enqueueCommand(int commandId, const QByteArray &data)
54EntityStore &Synchronizer::store() 54EntityStore &Synchronizer::store()
55{ 55{
56 if (!mEntityStore) { 56 if (!mEntityStore) {
57 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); 57 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, transaction());
58 } 58 }
59 return *mEntityStore; 59 return *mEntityStore;
60} 60}
@@ -62,7 +62,7 @@ EntityStore &Synchronizer::store()
62RemoteIdMap &Synchronizer::syncStore() 62RemoteIdMap &Synchronizer::syncStore()
63{ 63{
64 if (!mSyncStore) { 64 if (!mSyncStore) {
65 mSyncStore = QSharedPointer<RemoteIdMap>::create(mSyncTransaction); 65 mSyncStore = QSharedPointer<RemoteIdMap>::create(syncTransaction());
66 } 66 }
67 return *mSyncStore; 67 return *mSyncStore;
68} 68}
@@ -125,7 +125,7 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func
125 if (!remoteId.isEmpty()) { 125 if (!remoteId.isEmpty()) {
126 if (!exists(remoteId)) { 126 if (!exists(remoteId)) {
127 Trace() << "Found a removed entity: " << sinkId; 127 Trace() << "Found a removed entity: " << sinkId;
128 deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, 128 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType,
129 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); 129 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
130 } 130 }
131 } 131 }
@@ -135,7 +135,7 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func
135void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 135void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
136{ 136{
137 Trace() << "Create or modify" << bufferType << remoteId; 137 Trace() << "Create or modify" << bufferType << remoteId;
138 auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); 138 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType);
139 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 139 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
140 const auto found = mainDatabase.contains(sinkId); 140 const auto found = mainDatabase.contains(sinkId);
141 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); 141 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
@@ -154,7 +154,7 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
154 } 154 }
155 if (changed) { 155 if (changed) {
156 Trace() << "Found a modified entity: " << remoteId; 156 Trace() << "Found a modified entity: " << remoteId;
157 modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, 157 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory,
158 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); 158 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
159 } 159 }
160 } else { 160 } else {
@@ -165,12 +165,37 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
165 165
166KAsync::Job<void> Synchronizer::synchronize() 166KAsync::Job<void> Synchronizer::synchronize()
167{ 167{
168 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); 168 Trace() << "Synchronizing";
169 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
170 return synchronizeWithSource().then<void>([this]() { 169 return synchronizeWithSource().then<void>([this]() {
171 mTransaction.abort();
172 mSyncTransaction.commit();
173 mSyncStore.clear(); 170 mSyncStore.clear();
174 mEntityStore.clear(); 171 mEntityStore.clear();
175 }); 172 });
176} 173}
174
175void Synchronizer::commit()
176{
177 mTransaction.abort();
178}
179
180void Synchronizer::commitSync()
181{
182 mSyncTransaction.commit();
183}
184
185Sink::Storage::Transaction &Synchronizer::transaction()
186{
187 if (!mTransaction) {
188 Trace() << "Starting transaction";
189 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
190 }
191 return mTransaction;
192}
193
194Sink::Storage::Transaction &Synchronizer::syncTransaction()
195{
196 if (!mSyncTransaction) {
197 Trace() << "Starting transaction";
198 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
199 }
200 return mSyncTransaction;
201}
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 61bca7d..17e7003 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -41,6 +41,17 @@ public:
41 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback); 41 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback);
42 KAsync::Job<void> synchronize(); 42 KAsync::Job<void> synchronize();
43 43
44 //Read only access to main storage
45 EntityStore &store();
46
47 //Read/Write access to sync storage
48 RemoteIdMap &syncStore();
49
50 void commit();
51 void commitSync();
52 Sink::Storage::Transaction &transaction();
53 Sink::Storage::Transaction &syncTransaction();
54
44protected: 55protected:
45 ///Calls the callback to enqueue the command 56 ///Calls the callback to enqueue the command
46 void enqueueCommand(int commandId, const QByteArray &data); 57 void enqueueCommand(int commandId, const QByteArray &data);
@@ -71,12 +82,6 @@ protected:
71 */ 82 */
72 void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); 83 void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
73 84
74 //Read only access to main storage
75 EntityStore &store();
76
77 //Read/Write access to sync storage
78 RemoteIdMap &syncStore();
79
80 virtual KAsync::Job<void> synchronizeWithSource() = 0; 85 virtual KAsync::Job<void> synchronizeWithSource() = 0;
81 86
82private: 87private:
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp
index 1708cc5..5f42262 100644
--- a/examples/dummyresource/resourcefactory.cpp
+++ b/examples/dummyresource/resourcefactory.cpp
@@ -126,16 +126,15 @@ class DummySynchronizer : public Sink::Synchronizer {
126}; 126};
127 127
128DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline) 128DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline)
129 : Sink::GenericResource(PLUGIN_NAME, instanceIdentifier, pipeline, QSharedPointer<Sink::NullChangeReplay>::create(), QSharedPointer<DummySynchronizer>::create(PLUGIN_NAME, instanceIdentifier)), 129 : Sink::GenericResource(PLUGIN_NAME, instanceIdentifier, pipeline)
130 mEventAdaptorFactory(QSharedPointer<DummyEventAdaptorFactory>::create()),
131 mMailAdaptorFactory(QSharedPointer<DummyMailAdaptorFactory>::create()),
132 mFolderAdaptorFactory(QSharedPointer<DummyFolderAdaptorFactory>::create())
133{ 130{
134 addType(ENTITY_TYPE_MAIL, mMailAdaptorFactory, 131 setupSynchronizer(QSharedPointer<DummySynchronizer>::create(PLUGIN_NAME, instanceIdentifier));
132 setupChangereplay(QSharedPointer<Sink::NullChangeReplay>::create());
133 setupPreprocessors(ENTITY_TYPE_MAIL,
135 QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Mail>); 134 QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Mail>);
136 addType(ENTITY_TYPE_FOLDER, mFolderAdaptorFactory, 135 setupPreprocessors(ENTITY_TYPE_FOLDER,
137 QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Folder>); 136 QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Folder>);
138 addType(ENTITY_TYPE_EVENT, mEventAdaptorFactory, 137 setupPreprocessors(ENTITY_TYPE_EVENT,
139 QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Event>); 138 QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Event>);
140} 139}
141 140
diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h
index f73eb32..e8757db 100644
--- a/examples/dummyresource/resourcefactory.h
+++ b/examples/dummyresource/resourcefactory.h
@@ -47,10 +47,6 @@ private:
47 Sink::ApplicationDomain::Mail::Ptr createMail(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); 47 Sink::ApplicationDomain::Mail::Ptr createMail(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &);
48 Sink::ApplicationDomain::Folder::Ptr createFolder(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); 48 Sink::ApplicationDomain::Folder::Ptr createFolder(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &);
49 void synchronize(const QByteArray &bufferType, const QMap<QString, QMap<QString, QVariant> > &data, Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<Sink::ApplicationDomain::ApplicationDomainType::Ptr(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &)> createEntity); 49 void synchronize(const QByteArray &bufferType, const QMap<QString, QMap<QString, QVariant> > &data, Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<Sink::ApplicationDomain::ApplicationDomainType::Ptr(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &)> createEntity);
50
51 QSharedPointer<DummyEventAdaptorFactory> mEventAdaptorFactory;
52 QSharedPointer<DummyMailAdaptorFactory> mMailAdaptorFactory;
53 QSharedPointer<DummyFolderAdaptorFactory> mFolderAdaptorFactory;
54}; 50};
55 51
56class DummyResourceFactory : public Sink::ResourceFactory 52class DummyResourceFactory : public Sink::ResourceFactory
diff --git a/tests/testimplementations.h b/tests/testimplementations.h
index 197602c..d188c0c 100644
--- a/tests/testimplementations.h
+++ b/tests/testimplementations.h
@@ -107,7 +107,7 @@ public:
107class TestResource : public Sink::GenericResource 107class TestResource : public Sink::GenericResource
108{ 108{
109public: 109public:
110 TestResource(const QByteArray &instanceIdentifier, QSharedPointer<Sink::Pipeline> pipeline) : Sink::GenericResource("test", instanceIdentifier, pipeline, QSharedPointer<Sink::ChangeReplay>(), QSharedPointer<Sink::Synchronizer>()) 110 TestResource(const QByteArray &instanceIdentifier, QSharedPointer<Sink::Pipeline> pipeline) : Sink::GenericResource("test", instanceIdentifier, pipeline)
111 { 111 {
112 } 112 }
113 113