From 1d713d9e2dbaf27de9da087f9270d260dfc40c31 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 21 Nov 2016 23:13:38 +0100 Subject: Folded the SourceWriteback into the Synchronizer. By concentrating all communication to the source in one place we get rid of several oddities. * Quite a bit of duplication since both need access to the synchronizationStore and the source. * We currently have an akward locking in place because both classes access the ync store. This is not easier to resolve cleanly. * The live of resource implementers becomes easier. * An implementation could elect to not use changereplay and always do a full sync... (maybe?) --- common/synchronizer.cpp | 94 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 93 insertions(+), 1 deletion(-) (limited to 'common/synchronizer.cpp') diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 713387e..10acefc 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -33,7 +33,8 @@ SINK_DEBUG_AREA("synchronizer") using namespace Sink; Synchronizer::Synchronizer(const Sink::ResourceContext &context) - : mResourceContext(context), + : ChangeReplay(context), + mResourceContext(context), mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) { @@ -310,6 +311,97 @@ Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction( return mSyncTransaction; } +bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) +{ + Sink::EntityBuffer buffer(value); + const Sink::Entity &entity = buffer.entity(); + const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + if (!metadataBuffer->replayToSource()) { + SinkTrace() << "Change is coming from the source"; + } + return metadataBuffer->replayToSource(); +} + +KAsync::Job Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) +{ + SinkTrace() << "Replaying" << type << key; + + Sink::EntityBuffer buffer(value); + const Sink::Entity &entity = buffer.entity(); + const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + Q_ASSERT(!mSyncStore); + Q_ASSERT(!mSyncTransaction); + mEntityStore->startTransaction(Storage::DataStore::ReadOnly); + mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::ReadWrite); + + const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; + const auto uid = Sink::Storage::DataStore::uidFromKey(key); + const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList(); + QByteArray oldRemoteId; + + if (operation != Sink::Operation_Creation) { + oldRemoteId = syncStore().resolveLocalId(type, uid); + if (oldRemoteId.isEmpty()) { + SinkWarning() << "Couldn't find the remote id for: " << type << uid; + return KAsync::error(1, "Couldn't find the remote id."); + } + } + SinkTrace() << "Replaying " << key << type << uid << oldRemoteId; + + KAsync::Job job = KAsync::null(); + //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? + if (type == ApplicationDomain::getTypeName()) { + auto folder = store().readEntity(key); + job = replay(folder, operation, oldRemoteId, modifiedProperties); + } else if (type == ApplicationDomain::getTypeName()) { + auto mail = store().readEntity(key); + job = replay(mail, operation, oldRemoteId, modifiedProperties); + } + + return job.syncThen([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { + if (operation == Sink::Operation_Creation) { + SinkTrace() << "Replayed creation with remote id: " << remoteId; + if (remoteId.isEmpty()) { + SinkWarning() << "Returned an empty remoteId from the creation"; + } else { + syncStore().recordRemoteId(type, uid, remoteId); + } + } else if (operation == Sink::Operation_Modification) { + SinkTrace() << "Replayed modification with remote id: " << remoteId; + if (remoteId.isEmpty()) { + SinkWarning() << "Returned an empty remoteId from the creation"; + } else { + syncStore().updateRemoteId(type, uid, remoteId); + } + } else if (operation == Sink::Operation_Removal) { + SinkTrace() << "Replayed removal with remote id: " << oldRemoteId; + syncStore().removeRemoteId(type, uid, oldRemoteId); + } else { + SinkError() << "Unkown operation" << operation; + } + }) + .syncThen([this](const KAsync::Error &error) { + if (error) { + SinkWarning() << "Failed to replay change: " << error.errorMessage; + } + mSyncStore.clear(); + mSyncTransaction.commit(); + mEntityStore->abortTransaction(); + }); +} + +KAsync::Job Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList &) +{ + return KAsync::null(); +} + +KAsync::Job Synchronizer::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &, const QList &) +{ + return KAsync::null(); +} + #define REGISTER_TYPE(T) \ template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash &mergeCriteria); \ template void Synchronizer::modify(const T &entity); -- cgit v1.2.3