diff options
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 148 |
1 files changed, 58 insertions, 90 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 53db82f..5ddd77c 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -24,7 +24,7 @@ | |||
24 | #include "bufferutils.h" | 24 | #include "bufferutils.h" |
25 | #include "entitystore.h" | 25 | #include "entitystore.h" |
26 | #include "remoteidmap.h" | 26 | #include "remoteidmap.h" |
27 | #include "adaptorfactoryregistry.h" | 27 | #include "entityreader.h" |
28 | #include "createentity_generated.h" | 28 | #include "createentity_generated.h" |
29 | #include "modifyentity_generated.h" | 29 | #include "modifyentity_generated.h" |
30 | #include "deleteentity_generated.h" | 30 | #include "deleteentity_generated.h" |
@@ -33,13 +33,12 @@ SINK_DEBUG_AREA("synchronizer") | |||
33 | 33 | ||
34 | using namespace Sink; | 34 | using namespace Sink; |
35 | 35 | ||
36 | Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | 36 | Synchronizer::Synchronizer(const Sink::ResourceContext &context) |
37 | : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), | 37 | : mResourceContext(context), |
38 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | 38 | mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), |
39 | mResourceType(resourceType), | 39 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) |
40 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
41 | { | 40 | { |
42 | SinkTrace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; | 41 | SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); |
43 | } | 42 | } |
44 | 43 | ||
45 | Synchronizer::~Synchronizer() | 44 | Synchronizer::~Synchronizer() |
@@ -59,11 +58,9 @@ void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) | |||
59 | mEnqueue(commandId, data); | 58 | mEnqueue(commandId, data); |
60 | } | 59 | } |
61 | 60 | ||
62 | EntityStore &Synchronizer::store() | 61 | Storage::EntityStore &Synchronizer::store() |
63 | { | 62 | { |
64 | if (!mEntityStore) { | 63 | mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); |
65 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, transaction()); | ||
66 | } | ||
67 | return *mEntityStore; | 64 | return *mEntityStore; |
68 | } | 65 | } |
69 | 66 | ||
@@ -75,13 +72,12 @@ RemoteIdMap &Synchronizer::syncStore() | |||
75 | return *mSyncStore; | 72 | return *mSyncStore; |
76 | } | 73 | } |
77 | 74 | ||
78 | void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 75 | void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject) |
79 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
80 | { | 76 | { |
81 | // These changes are coming from the source | 77 | // These changes are coming from the source |
82 | const auto replayToSource = false; | 78 | const auto replayToSource = false; |
83 | flatbuffers::FlatBufferBuilder entityFbb; | 79 | flatbuffers::FlatBufferBuilder entityFbb; |
84 | adaptorFactory.createBuffer(domainObject, entityFbb); | 80 | mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); |
85 | flatbuffers::FlatBufferBuilder fbb; | 81 | flatbuffers::FlatBufferBuilder fbb; |
86 | // This is the resource type and not the domain type | 82 | // This is the resource type and not the domain type |
87 | auto entityId = fbb.CreateString(sinkId.toStdString()); | 83 | auto entityId = fbb.CreateString(sinkId.toStdString()); |
@@ -89,18 +85,17 @@ void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &buff | |||
89 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | 85 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); |
90 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); | 86 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); |
91 | Sink::Commands::FinishCreateEntityBuffer(fbb, location); | 87 | Sink::Commands::FinishCreateEntityBuffer(fbb, location); |
92 | callback(BufferUtils::extractBuffer(fbb)); | 88 | enqueueCommand(Sink::Commands::CreateEntityCommand, BufferUtils::extractBuffer(fbb)); |
93 | } | 89 | } |
94 | 90 | ||
95 | void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 91 | void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject) |
96 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
97 | { | 92 | { |
98 | // FIXME removals | 93 | // FIXME removals |
99 | QByteArrayList deletedProperties; | 94 | QByteArrayList deletedProperties; |
100 | // These changes are coming from the source | 95 | // These changes are coming from the source |
101 | const auto replayToSource = false; | 96 | const auto replayToSource = false; |
102 | flatbuffers::FlatBufferBuilder entityFbb; | 97 | flatbuffers::FlatBufferBuilder entityFbb; |
103 | adaptorFactory.createBuffer(domainObject, entityFbb); | 98 | mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); |
104 | flatbuffers::FlatBufferBuilder fbb; | 99 | flatbuffers::FlatBufferBuilder fbb; |
105 | auto entityId = fbb.CreateString(sinkId.toStdString()); | 100 | auto entityId = fbb.CreateString(sinkId.toStdString()); |
106 | auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties()); | 101 | auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties()); |
@@ -110,10 +105,10 @@ void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const | |||
110 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | 105 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); |
111 | auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties); | 106 | auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties); |
112 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); | 107 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); |
113 | callback(BufferUtils::extractBuffer(fbb)); | 108 | enqueueCommand(Sink::Commands::ModifyEntityCommand, BufferUtils::extractBuffer(fbb)); |
114 | } | 109 | } |
115 | 110 | ||
116 | void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | 111 | void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType) |
117 | { | 112 | { |
118 | // These changes are coming from the source | 113 | // These changes are coming from the source |
119 | const auto replayToSource = false; | 114 | const auto replayToSource = false; |
@@ -123,63 +118,69 @@ void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const | |||
123 | auto type = fbb.CreateString(bufferType.toStdString()); | 118 | auto type = fbb.CreateString(bufferType.toStdString()); |
124 | auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); | 119 | auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); |
125 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | 120 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); |
126 | callback(BufferUtils::extractBuffer(fbb)); | 121 | enqueueCommand(Sink::Commands::DeleteEntityCommand, BufferUtils::extractBuffer(fbb)); |
127 | } | 122 | } |
128 | 123 | ||
129 | void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | 124 | void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) |
130 | { | 125 | { |
131 | entryGenerator([this, bufferType, &exists](const QByteArray &key) { | 126 | entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { |
132 | auto sinkId = Sink::Storage::uidFromKey(key); | ||
133 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); | 127 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); |
134 | SinkTrace() << "Checking for removal " << key << remoteId; | 128 | SinkTrace() << "Checking for removal " << sinkId << remoteId; |
135 | // If we have no remoteId, the entity hasn't been replayed to the source yet | 129 | // If we have no remoteId, the entity hasn't been replayed to the source yet |
136 | if (!remoteId.isEmpty()) { | 130 | if (!remoteId.isEmpty()) { |
137 | if (!exists(remoteId)) { | 131 | if (!exists(remoteId)) { |
138 | SinkTrace() << "Found a removed entity: " << sinkId; | 132 | SinkTrace() << "Found a removed entity: " << sinkId; |
139 | deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, | 133 | deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); |
140 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); | ||
141 | } | 134 | } |
142 | } | 135 | } |
143 | }); | 136 | }); |
144 | } | 137 | } |
145 | 138 | ||
146 | void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | 139 | void Synchronizer::scanForRemovals(const QByteArray &bufferType, std::function<bool(const QByteArray &remoteId)> exists) |
147 | { | 140 | { |
148 | auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); | 141 | scanForRemovals(bufferType, |
149 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | 142 | [this, &bufferType](const std::function<void(const QByteArray &)> &callback) { |
150 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | 143 | store().readAllUids(bufferType, [callback](const QByteArray &uid) { |
151 | Q_ASSERT(adaptorFactory); | 144 | callback(uid); |
152 | qint64 retrievedRevision = 0; | 145 | }); |
153 | if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { | 146 | }, |
147 | exists | ||
148 | ); | ||
149 | } | ||
150 | |||
151 | void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | ||
152 | { | ||
153 | store.readLatest(bufferType, sinkId, [&, this](const Sink::ApplicationDomain::ApplicationDomainType ¤t) { | ||
154 | bool changed = false; | 154 | bool changed = false; |
155 | for (const auto &property : entity.changedProperties()) { | 155 | for (const auto &property : entity.changedProperties()) { |
156 | if (entity.getProperty(property) != current->getProperty(property)) { | 156 | if (entity.getProperty(property) != current.getProperty(property)) { |
157 | SinkTrace() << "Property changed " << sinkId << property; | 157 | SinkTrace() << "Property changed " << sinkId << property; |
158 | changed = true; | 158 | changed = true; |
159 | } | 159 | } |
160 | } | 160 | } |
161 | if (changed) { | 161 | if (changed) { |
162 | SinkTrace() << "Found a modified entity: " << remoteId; | 162 | SinkTrace() << "Found a modified entity: " << sinkId; |
163 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, | 163 | modifyEntity(sinkId, store.maxRevision(), bufferType, entity); |
164 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); | ||
165 | } | 164 | } |
166 | } else { | 165 | }); |
167 | SinkWarning() << "Failed to get current entity"; | 166 | } |
168 | } | 167 | |
168 | void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | ||
169 | { | ||
170 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | ||
171 | Storage::EntityStore store(mResourceContext); | ||
172 | modifyIfChanged(store, bufferType, sinkId, entity); | ||
169 | } | 173 | } |
170 | 174 | ||
171 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | 175 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) |
172 | { | 176 | { |
173 | SinkTrace() << "Create or modify" << bufferType << remoteId; | 177 | SinkTrace() << "Create or modify" << bufferType << remoteId; |
174 | auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); | 178 | Storage::EntityStore store(mResourceContext); |
175 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | 179 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); |
176 | const auto found = mainDatabase.contains(sinkId); | 180 | const auto found = store.contains(bufferType, sinkId); |
177 | if (!found) { | 181 | if (!found) { |
178 | SinkTrace() << "Found a new entity: " << remoteId; | 182 | SinkTrace() << "Found a new entity: " << remoteId; |
179 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | 183 | createEntity(sinkId, bufferType, entity); |
180 | Q_ASSERT(adaptorFactory); | ||
181 | createEntity( | ||
182 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); | ||
183 | } else { // modification | 184 | } else { // modification |
184 | modify(bufferType, remoteId, entity); | 185 | modify(bufferType, remoteId, entity); |
185 | } | 186 | } |
@@ -190,10 +191,9 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
190 | { | 191 | { |
191 | 192 | ||
192 | SinkTrace() << "Create or modify" << bufferType << remoteId; | 193 | SinkTrace() << "Create or modify" << bufferType << remoteId; |
193 | auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); | ||
194 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | 194 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); |
195 | const auto found = mainDatabase.contains(sinkId); | 195 | Storage::EntityStore store(mResourceContext); |
196 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | 196 | const auto found = store.contains(bufferType, sinkId); |
197 | if (!found) { | 197 | if (!found) { |
198 | if (!mergeCriteria.isEmpty()) { | 198 | if (!mergeCriteria.isEmpty()) { |
199 | Sink::Query query; | 199 | Sink::Query query; |
@@ -201,7 +201,8 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
201 | query.filter(it.key(), it.value()); | 201 | query.filter(it.key(), it.value()); |
202 | } | 202 | } |
203 | bool merge = false; | 203 | bool merge = false; |
204 | Sink::EntityReader<DomainType> reader(mResourceType, mResourceInstanceIdentifier, transaction()); | 204 | Storage::EntityStore store(mResourceContext); |
205 | Sink::EntityReader<DomainType> reader(store); | ||
205 | reader.query(query, | 206 | reader.query(query, |
206 | [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ | 207 | [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ |
207 | merge = true; | 208 | merge = true; |
@@ -211,43 +212,21 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
211 | }); | 212 | }); |
212 | if (!merge) { | 213 | if (!merge) { |
213 | SinkTrace() << "Found a new entity: " << remoteId; | 214 | SinkTrace() << "Found a new entity: " << remoteId; |
214 | createEntity( | 215 | createEntity(sinkId, bufferType, entity); |
215 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); | ||
216 | } | 216 | } |
217 | } else { | 217 | } else { |
218 | SinkTrace() << "Found a new entity: " << remoteId; | 218 | SinkTrace() << "Found a new entity: " << remoteId; |
219 | createEntity( | 219 | createEntity(sinkId, bufferType, entity); |
220 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); | ||
221 | } | 220 | } |
222 | } else { // modification | 221 | } else { // modification |
223 | qint64 retrievedRevision = 0; | 222 | modifyIfChanged(store, bufferType, sinkId, entity); |
224 | if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { | ||
225 | bool changed = false; | ||
226 | for (const auto &property : entity.changedProperties()) { | ||
227 | if (entity.getProperty(property) != current->getProperty(property)) { | ||
228 | SinkTrace() << "Property changed " << sinkId << property; | ||
229 | changed = true; | ||
230 | } | ||
231 | } | ||
232 | if (changed) { | ||
233 | SinkTrace() << "Found a modified entity: " << remoteId; | ||
234 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, | ||
235 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); | ||
236 | } | ||
237 | } else { | ||
238 | SinkWarning() << "Failed to get current entity"; | ||
239 | } | ||
240 | } | 223 | } |
241 | } | 224 | } |
242 | 225 | ||
243 | template<typename DomainType> | 226 | template<typename DomainType> |
244 | void Synchronizer::modify(const DomainType &entity) | 227 | void Synchronizer::modify(const DomainType &entity) |
245 | { | 228 | { |
246 | const auto bufferType = ApplicationDomain::getTypeName<DomainType>(); | 229 | modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity); |
247 | const auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | ||
248 | Q_ASSERT(adaptorFactory); | ||
249 | modifyEntity(entity.identifier(), entity.revision(), bufferType, entity, *adaptorFactory, | ||
250 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); | ||
251 | } | 230 | } |
252 | 231 | ||
253 | KAsync::Job<void> Synchronizer::synchronize() | 232 | KAsync::Job<void> Synchronizer::synchronize() |
@@ -257,7 +236,6 @@ KAsync::Job<void> Synchronizer::synchronize() | |||
257 | mMessageQueue->startTransaction(); | 236 | mMessageQueue->startTransaction(); |
258 | return synchronizeWithSource().syncThen<void>([this]() { | 237 | return synchronizeWithSource().syncThen<void>([this]() { |
259 | mSyncStore.clear(); | 238 | mSyncStore.clear(); |
260 | mEntityStore.clear(); | ||
261 | mMessageQueue->commit(); | 239 | mMessageQueue->commit(); |
262 | mSyncInProgress = false; | 240 | mSyncInProgress = false; |
263 | }); | 241 | }); |
@@ -266,8 +244,7 @@ KAsync::Job<void> Synchronizer::synchronize() | |||
266 | void Synchronizer::commit() | 244 | void Synchronizer::commit() |
267 | { | 245 | { |
268 | mMessageQueue->commit(); | 246 | mMessageQueue->commit(); |
269 | mTransaction.abort(); | 247 | mEntityStore->abortTransaction(); |
270 | mEntityStore.clear(); | ||
271 | mSyncTransaction.commit(); | 248 | mSyncTransaction.commit(); |
272 | mSyncStore.clear(); | 249 | mSyncStore.clear(); |
273 | if (mSyncInProgress) { | 250 | if (mSyncInProgress) { |
@@ -275,20 +252,11 @@ void Synchronizer::commit() | |||
275 | } | 252 | } |
276 | } | 253 | } |
277 | 254 | ||
278 | Sink::Storage::Transaction &Synchronizer::transaction() | 255 | Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() |
279 | { | ||
280 | if (!mTransaction) { | ||
281 | SinkTrace() << "Starting transaction"; | ||
282 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
283 | } | ||
284 | return mTransaction; | ||
285 | } | ||
286 | |||
287 | Sink::Storage::Transaction &Synchronizer::syncTransaction() | ||
288 | { | 256 | { |
289 | if (!mSyncTransaction) { | 257 | if (!mSyncTransaction) { |
290 | SinkTrace() << "Starting transaction"; | 258 | SinkTrace() << "Starting transaction"; |
291 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | 259 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); |
292 | } | 260 | } |
293 | return mSyncTransaction; | 261 | return mSyncTransaction; |
294 | } | 262 | } |