summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/genericresource.cpp29
-rw-r--r--common/genericresource.h4
-rw-r--r--common/sourcewriteback.cpp146
-rw-r--r--common/sourcewriteback.h71
-rw-r--r--common/synchronizer.cpp94
-rw-r--r--common/synchronizer.h15
7 files changed, 120 insertions, 240 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index a08be8a..5ba524b 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -70,7 +70,6 @@ set(command_SRCS
70 adaptorfactoryregistry.cpp 70 adaptorfactoryregistry.cpp
71 synchronizer.cpp 71 synchronizer.cpp
72 remoteidmap.cpp 72 remoteidmap.cpp
73 sourcewriteback.cpp
74 mailpreprocessor.cpp 73 mailpreprocessor.cpp
75 specialpurposepreprocessor.cpp 74 specialpurposepreprocessor.cpp
76 datastorequery.cpp 75 datastorequery.cpp
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index c4c8bc6..746fa33 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -292,14 +292,14 @@ KAsync::Job<void> GenericResource::inspect(
292 292
293void GenericResource::enableChangeReplay(bool enable) 293void GenericResource::enableChangeReplay(bool enable)
294{ 294{
295 Q_ASSERT(mChangeReplay); 295 Q_ASSERT(mSynchronizer);
296 if (enable) { 296 if (enable) {
297 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); 297 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
298 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 298 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
299 QMetaObject::invokeMethod(mChangeReplay.data(), "revisionChanged", Qt::QueuedConnection); 299 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection);
300 } else { 300 } else {
301 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged); 301 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged);
302 QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 302 QObject::disconnect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
303 } 303 }
304} 304}
305 305
@@ -314,13 +314,8 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync
314 mSynchronizer->setup([this](int commandId, const QByteArray &data) { 314 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
315 enqueueCommand(mSynchronizerQueue, commandId, data); 315 enqueueCommand(mSynchronizerQueue, commandId, data);
316 }, mSynchronizerQueue); 316 }, mSynchronizerQueue);
317}
318
319void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay)
320{
321 mChangeReplay = changeReplay;
322 { 317 {
323 auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::replayingChanges, [this]() { 318 auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() {
324 Sink::Notification n; 319 Sink::Notification n;
325 n.id = "changereplay"; 320 n.id = "changereplay";
326 n.type = Sink::Notification::Status; 321 n.type = Sink::Notification::Status;
@@ -331,7 +326,7 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan
331 Q_ASSERT(ret); 326 Q_ASSERT(ret);
332 } 327 }
333 { 328 {
334 auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, [this]() { 329 auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() {
335 Sink::Notification n; 330 Sink::Notification n;
336 n.id = "changereplay"; 331 n.id = "changereplay";
337 n.type = Sink::Notification::Status; 332 n.type = Sink::Notification::Status;
@@ -342,7 +337,7 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan
342 Q_ASSERT(ret); 337 Q_ASSERT(ret);
343 } 338 }
344 339
345 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); 340 mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision());
346 enableChangeReplay(true); 341 enableChangeReplay(true);
347} 342}
348 343
@@ -459,11 +454,11 @@ KAsync::Job<void> GenericResource::processAllMessages()
459 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) 454 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); })
460 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) 455 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); })
461 .then<void>([this](KAsync::Future<void> &f) { 456 .then<void>([this](KAsync::Future<void> &f) {
462 if (mChangeReplay->allChangesReplayed()) { 457 if (mSynchronizer->allChangesReplayed()) {
463 f.setFinished(); 458 f.setFinished();
464 } else { 459 } else {
465 auto context = new QObject; 460 auto context = new QObject;
466 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { 461 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() {
467 delete context; 462 delete context;
468 f.setFinished(); 463 f.setFinished();
469 }); 464 });
@@ -473,7 +468,7 @@ KAsync::Job<void> GenericResource::processAllMessages()
473 468
474void GenericResource::updateLowerBoundRevision() 469void GenericResource::updateLowerBoundRevision()
475{ 470{
476 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision())); 471 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSynchronizer->getLastReplayedRevision()));
477} 472}
478 473
479void GenericResource::setLowerBoundRevision(qint64 revision) 474void GenericResource::setLowerBoundRevision(qint64 revision)
diff --git a/common/genericresource.h b/common/genericresource.h
index 3736c8f..7e0f5ad 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -24,7 +24,7 @@
24#include <messagequeue.h> 24#include <messagequeue.h>
25#include <flatbuffers/flatbuffers.h> 25#include <flatbuffers/flatbuffers.h>
26#include <domainadaptor.h> 26#include <domainadaptor.h>
27#include "changereplay.h" 27#include <resourcecontext.h>
28 28
29#include <QTimer> 29#include <QTimer>
30 30
@@ -66,7 +66,6 @@ protected:
66 66
67 void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors); 67 void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors);
68 void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); 68 void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer);
69 void setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay);
70 69
71 void onProcessorError(int errorCode, const QString &errorMessage); 70 void onProcessorError(int errorCode, const QString &errorMessage);
72 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); 71 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
@@ -78,7 +77,6 @@ protected:
78 77
79private: 78private:
80 std::unique_ptr<CommandProcessor> mProcessor; 79 std::unique_ptr<CommandProcessor> mProcessor;
81 QSharedPointer<ChangeReplay> mChangeReplay;
82 QSharedPointer<Synchronizer> mSynchronizer; 80 QSharedPointer<Synchronizer> mSynchronizer;
83 int mError; 81 int mError;
84 QTimer mCommitQueueTimer; 82 QTimer mCommitQueueTimer;
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp
deleted file mode 100644
index e2994d1..0000000
--- a/common/sourcewriteback.cpp
+++ /dev/null
@@ -1,146 +0,0 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "sourcewriteback.h"
21
22#include "definitions.h"
23#include "log.h"
24#include "bufferutils.h"
25#include "entitybuffer.h"
26#include "entity_generated.h"
27
28#define ENTITY_TYPE_MAIL "mail"
29#define ENTITY_TYPE_FOLDER "folder"
30
31SINK_DEBUG_AREA("sourcewriteback")
32
33using namespace Sink;
34
35SourceWriteBack::SourceWriteBack(const ResourceContext &context)
36 : ChangeReplay(context),
37 mResourceContext(context),
38 mSyncStorage(Sink::storageLocation(), context.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadWrite),
39 mEntityStore(QSharedPointer<Storage::EntityStore>::create(mResourceContext))
40{
41
42}
43
44Storage::EntityStore &SourceWriteBack::store()
45{
46 return *mEntityStore;
47}
48
49RemoteIdMap &SourceWriteBack::syncStore()
50{
51 if (!mSyncStore) {
52 mSyncStore = QSharedPointer<RemoteIdMap>::create(mSyncTransaction);
53 }
54 return *mSyncStore;
55}
56
57bool SourceWriteBack::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
58{
59 Sink::EntityBuffer buffer(value);
60 const Sink::Entity &entity = buffer.entity();
61 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
62 Q_ASSERT(metadataBuffer);
63 if (!metadataBuffer->replayToSource()) {
64 SinkTrace() << "Change is coming from the source";
65 }
66 return metadataBuffer->replayToSource();
67}
68
69KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
70{
71 SinkTrace() << "Replaying" << type << key;
72
73 Sink::EntityBuffer buffer(value);
74 const Sink::Entity &entity = buffer.entity();
75 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
76 Q_ASSERT(metadataBuffer);
77 Q_ASSERT(!mSyncStore);
78 Q_ASSERT(!mSyncTransaction);
79 mEntityStore->startTransaction(Storage::DataStore::ReadOnly);
80 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::ReadWrite);
81
82 // const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
83 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
84 const auto uid = Sink::Storage::DataStore::uidFromKey(key);
85 const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList();
86 QByteArray oldRemoteId;
87
88 if (operation != Sink::Operation_Creation) {
89 oldRemoteId = syncStore().resolveLocalId(type, uid);
90 if (oldRemoteId.isEmpty()) {
91 SinkWarning() << "Couldn't find the remote id for: " << type << uid;
92 return KAsync::error<void>(1, "Couldn't find the remote id.");
93 }
94 }
95 SinkTrace() << "Replaying " << key << type << uid << oldRemoteId;
96
97 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
98 if (type == ENTITY_TYPE_FOLDER) {
99 auto folder = store().readEntity<ApplicationDomain::Folder>(key);
100 job = replay(folder, operation, oldRemoteId, modifiedProperties);
101 } else if (type == ENTITY_TYPE_MAIL) {
102 auto mail = store().readEntity<ApplicationDomain::Mail>(key);
103 job = replay(mail, operation, oldRemoteId, modifiedProperties);
104 }
105
106 return job.syncThen<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) {
107 if (operation == Sink::Operation_Creation) {
108 SinkTrace() << "Replayed creation with remote id: " << remoteId;
109 if (remoteId.isEmpty()) {
110 SinkWarning() << "Returned an empty remoteId from the creation";
111 } else {
112 syncStore().recordRemoteId(type, uid, remoteId);
113 }
114 } else if (operation == Sink::Operation_Modification) {
115 SinkTrace() << "Replayed modification with remote id: " << remoteId;
116 if (remoteId.isEmpty()) {
117 SinkWarning() << "Returned an empty remoteId from the creation";
118 } else {
119 syncStore().updateRemoteId(type, uid, remoteId);
120 }
121 } else if (operation == Sink::Operation_Removal) {
122 SinkTrace() << "Replayed removal with remote id: " << oldRemoteId;
123 syncStore().removeRemoteId(type, uid, oldRemoteId);
124 } else {
125 SinkError() << "Unkown operation" << operation;
126 }
127 })
128 .syncThen<void>([this](const KAsync::Error &error) {
129 if (error) {
130 SinkWarning() << "Failed to replay change: " << error.errorMessage;
131 }
132 mSyncStore.clear();
133 mSyncTransaction.commit();
134 mEntityStore->abortTransaction();
135 });
136}
137
138KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
139{
140 return KAsync::null<QByteArray>();
141}
142
143KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
144{
145 return KAsync::null<QByteArray>();
146}
diff --git a/common/sourcewriteback.h b/common/sourcewriteback.h
deleted file mode 100644
index cf393e4..0000000
--- a/common/sourcewriteback.h
+++ /dev/null
@@ -1,71 +0,0 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include "sink_export.h"
23
24#include "changereplay.h"
25#include "storage.h"
26#include "storage/entitystore.h"
27#include "remoteidmap.h"
28#include "metadata_generated.h"
29
30namespace Sink {
31
32/**
33 * Replay changes to the source
34 */
35class SINK_EXPORT SourceWriteBack : public ChangeReplay
36{
37public:
38 SourceWriteBack(const ResourceContext &resourceContext);
39
40protected:
41 ///Base implementation calls the replay$Type calls
42 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
43 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
44
45protected:
46 ///Implement to write back changes to the server
47 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
48 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
49
50 //Read/Write access to sync storage
51 RemoteIdMap &syncStore();
52
53 template <typename T>
54 T getPrevious(const T &entity)
55 {
56 return store().readPrevious<T>(entity.identifier(), entity.revision());
57 }
58
59private:
60 //Read only access to main storage
61 Storage::EntityStore &store();
62 ResourceContext mResourceContext;
63 Sink::Storage::DataStore mSyncStorage;
64 QSharedPointer<RemoteIdMap> mSyncStore;
65 QSharedPointer<Storage::EntityStore> mEntityStore;
66 Sink::Storage::DataStore::Transaction mSyncTransaction;
67 QByteArray mResourceType;
68 QByteArray mResourceInstanceIdentifier;
69};
70
71}
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 713387e..10acefc 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -33,7 +33,8 @@ SINK_DEBUG_AREA("synchronizer")
33using namespace Sink; 33using namespace Sink;
34 34
35Synchronizer::Synchronizer(const Sink::ResourceContext &context) 35Synchronizer::Synchronizer(const Sink::ResourceContext &context)
36 : mResourceContext(context), 36 : ChangeReplay(context),
37 mResourceContext(context),
37 mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), 38 mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)),
38 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) 39 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite)
39{ 40{
@@ -310,6 +311,97 @@ Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction(
310 return mSyncTransaction; 311 return mSyncTransaction;
311} 312}
312 313
314bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
315{
316 Sink::EntityBuffer buffer(value);
317 const Sink::Entity &entity = buffer.entity();
318 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
319 Q_ASSERT(metadataBuffer);
320 if (!metadataBuffer->replayToSource()) {
321 SinkTrace() << "Change is coming from the source";
322 }
323 return metadataBuffer->replayToSource();
324}
325
326KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
327{
328 SinkTrace() << "Replaying" << type << key;
329
330 Sink::EntityBuffer buffer(value);
331 const Sink::Entity &entity = buffer.entity();
332 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
333 Q_ASSERT(metadataBuffer);
334 Q_ASSERT(!mSyncStore);
335 Q_ASSERT(!mSyncTransaction);
336 mEntityStore->startTransaction(Storage::DataStore::ReadOnly);
337 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::ReadWrite);
338
339 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
340 const auto uid = Sink::Storage::DataStore::uidFromKey(key);
341 const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList();
342 QByteArray oldRemoteId;
343
344 if (operation != Sink::Operation_Creation) {
345 oldRemoteId = syncStore().resolveLocalId(type, uid);
346 if (oldRemoteId.isEmpty()) {
347 SinkWarning() << "Couldn't find the remote id for: " << type << uid;
348 return KAsync::error<void>(1, "Couldn't find the remote id.");
349 }
350 }
351 SinkTrace() << "Replaying " << key << type << uid << oldRemoteId;
352
353 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
354 //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally?
355 if (type == ApplicationDomain::getTypeName<ApplicationDomain::Folder>()) {
356 auto folder = store().readEntity<ApplicationDomain::Folder>(key);
357 job = replay(folder, operation, oldRemoteId, modifiedProperties);
358 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Mail>()) {
359 auto mail = store().readEntity<ApplicationDomain::Mail>(key);
360 job = replay(mail, operation, oldRemoteId, modifiedProperties);
361 }
362
363 return job.syncThen<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) {
364 if (operation == Sink::Operation_Creation) {
365 SinkTrace() << "Replayed creation with remote id: " << remoteId;
366 if (remoteId.isEmpty()) {
367 SinkWarning() << "Returned an empty remoteId from the creation";
368 } else {
369 syncStore().recordRemoteId(type, uid, remoteId);
370 }
371 } else if (operation == Sink::Operation_Modification) {
372 SinkTrace() << "Replayed modification with remote id: " << remoteId;
373 if (remoteId.isEmpty()) {
374 SinkWarning() << "Returned an empty remoteId from the creation";
375 } else {
376 syncStore().updateRemoteId(type, uid, remoteId);
377 }
378 } else if (operation == Sink::Operation_Removal) {
379 SinkTrace() << "Replayed removal with remote id: " << oldRemoteId;
380 syncStore().removeRemoteId(type, uid, oldRemoteId);
381 } else {
382 SinkError() << "Unkown operation" << operation;
383 }
384 })
385 .syncThen<void>([this](const KAsync::Error &error) {
386 if (error) {
387 SinkWarning() << "Failed to replay change: " << error.errorMessage;
388 }
389 mSyncStore.clear();
390 mSyncTransaction.commit();
391 mEntityStore->abortTransaction();
392 });
393}
394
395KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
396{
397 return KAsync::null<QByteArray>();
398}
399
400KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
401{
402 return KAsync::null<QByteArray>();
403}
404
313#define REGISTER_TYPE(T) \ 405#define REGISTER_TYPE(T) \
314 template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria); \ 406 template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria); \
315 template void Synchronizer::modify(const T &entity); 407 template void Synchronizer::modify(const T &entity);
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 47518ee..0a51f54 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -27,6 +27,8 @@
27#include <messagequeue.h> 27#include <messagequeue.h>
28#include <storage.h> 28#include <storage.h>
29#include <storage/entitystore.h> 29#include <storage/entitystore.h>
30#include "changereplay.h"
31#include "remoteidmap.h"
30 32
31namespace Sink { 33namespace Sink {
32class RemoteIdMap; 34class RemoteIdMap;
@@ -34,8 +36,9 @@ class RemoteIdMap;
34/** 36/**
35 * Synchronize and add what we don't already have to local queue 37 * Synchronize and add what we don't already have to local queue
36 */ 38 */
37class SINK_EXPORT Synchronizer 39class SINK_EXPORT Synchronizer : public ChangeReplay
38{ 40{
41 Q_OBJECT
39public: 42public:
40 Synchronizer(const Sink::ResourceContext &resourceContext); 43 Synchronizer(const Sink::ResourceContext &resourceContext);
41 virtual ~Synchronizer(); 44 virtual ~Synchronizer();
@@ -53,6 +56,16 @@ public:
53 Sink::Storage::DataStore::Transaction &syncTransaction(); 56 Sink::Storage::DataStore::Transaction &syncTransaction();
54 57
55protected: 58protected:
59 ///Base implementation calls the replay$Type calls
60 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
61 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
62
63protected:
64 ///Implement to write back changes to the server
65 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
66 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
67
68protected:
56 ///Calls the callback to enqueue the command 69 ///Calls the callback to enqueue the command
57 void enqueueCommand(int commandId, const QByteArray &data); 70 void enqueueCommand(int commandId, const QByteArray &data);
58 71