diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-05-28 00:24:53 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-05-28 00:24:53 +0200 |
commit | e9c75177590d8546ebd9425f16c4269a9c92f517 (patch) | |
tree | 8a953631e467d9df50657e22bd90954b7b71c990 /common/genericresource.h | |
parent | 8f01eb530262d1442fc4fa0782a41e052412d43b (diff) | |
download | sink-e9c75177590d8546ebd9425f16c4269a9c92f517.tar.gz sink-e9c75177590d8546ebd9425f16c4269a9c92f517.zip |
Refactored the generic resource to use separate classes for
changereplay and synchronization.
This cleans up the API and avoids the excessive passing around of
transactions. It also provides more flexibility in eventually using
different synchronization strategies for different resources.
Diffstat (limited to 'common/genericresource.h')
-rw-r--r-- | common/genericresource.h | 151 |
1 files changed, 120 insertions, 31 deletions
diff --git a/common/genericresource.h b/common/genericresource.h index c551e29..45d5d3a 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -24,14 +24,16 @@ | |||
24 | #include <messagequeue.h> | 24 | #include <messagequeue.h> |
25 | #include <flatbuffers/flatbuffers.h> | 25 | #include <flatbuffers/flatbuffers.h> |
26 | #include <domainadaptor.h> | 26 | #include <domainadaptor.h> |
27 | #include "changereplay.h" | ||
28 | |||
27 | #include <QTimer> | 29 | #include <QTimer> |
28 | 30 | ||
29 | class CommandProcessor; | 31 | class CommandProcessor; |
30 | class ChangeReplay; | ||
31 | 32 | ||
32 | namespace Sink { | 33 | namespace Sink { |
33 | class Pipeline; | 34 | class Pipeline; |
34 | class Preprocessor; | 35 | class Preprocessor; |
36 | class Synchronizer; | ||
35 | 37 | ||
36 | /** | 38 | /** |
37 | * Generic Resource implementation. | 39 | * Generic Resource implementation. |
@@ -39,7 +41,7 @@ class Preprocessor; | |||
39 | class SINK_EXPORT GenericResource : public Resource | 41 | class SINK_EXPORT GenericResource : public Resource |
40 | { | 42 | { |
41 | public: | 43 | public: |
42 | GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline = QSharedPointer<Pipeline>()); | 44 | GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer); |
43 | virtual ~GenericResource(); | 45 | virtual ~GenericResource(); |
44 | 46 | ||
45 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; | 47 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; |
@@ -64,41 +66,90 @@ protected: | |||
64 | 66 | ||
65 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); | 67 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); |
66 | 68 | ||
67 | ///Base implementation call the replay$Type calls | ||
68 | virtual KAsync::Job<void> replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value); | ||
69 | ///Implement to write back changes to the server | ||
70 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); | ||
71 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); | ||
72 | |||
73 | void onProcessorError(int errorCode, const QString &errorMessage); | 69 | void onProcessorError(int errorCode, const QString &errorMessage); |
74 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | 70 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); |
75 | 71 | ||
76 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 72 | MessageQueue mUserQueue; |
77 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | 73 | MessageQueue mSynchronizerQueue; |
78 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 74 | QByteArray mResourceType; |
79 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | 75 | QByteArray mResourceInstanceIdentifier; |
80 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | 76 | QSharedPointer<Pipeline> mPipeline; |
77 | |||
78 | private: | ||
79 | CommandProcessor *mProcessor; | ||
80 | QSharedPointer<ChangeReplay> mChangeReplay; | ||
81 | QSharedPointer<Synchronizer> mSynchronizer; | ||
82 | int mError; | ||
83 | QTimer mCommitQueueTimer; | ||
84 | qint64 mClientLowerBoundRevision; | ||
85 | QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; | ||
86 | }; | ||
87 | |||
88 | class SINK_EXPORT SyncStore | ||
89 | { | ||
90 | public: | ||
91 | SyncStore(Sink::Storage::Transaction &); | ||
81 | 92 | ||
82 | /** | 93 | /** |
83 | * Records a localId to remoteId mapping | 94 | * Records a localId to remoteId mapping |
84 | */ | 95 | */ |
85 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 96 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
86 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 97 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
87 | void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 98 | void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
88 | 99 | ||
89 | /** | 100 | /** |
90 | * Tries to find a local id for the remote id, and creates a new local id otherwise. | 101 | * Tries to find a local id for the remote id, and creates a new local id otherwise. |
91 | * | 102 | * |
92 | * The new local id is recorded in the local to remote id mapping. | 103 | * The new local id is recorded in the local to remote id mapping. |
93 | */ | 104 | */ |
94 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 105 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId); |
95 | 106 | ||
96 | /** | 107 | /** |
97 | * Tries to find a remote id for a local id. | 108 | * Tries to find a remote id for a local id. |
98 | * | 109 | * |
99 | * This can fail if the entity hasn't been written back to the server yet. | 110 | * This can fail if the entity hasn't been written back to the server yet. |
100 | */ | 111 | */ |
101 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction); | 112 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); |
113 | |||
114 | private: | ||
115 | Sink::Storage::Transaction &mTransaction; | ||
116 | }; | ||
117 | |||
118 | class SINK_EXPORT EntityStore | ||
119 | { | ||
120 | public: | ||
121 | EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); | ||
122 | |||
123 | template<typename T> | ||
124 | T read(const QByteArray &identifier) const; | ||
125 | |||
126 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | ||
127 | private: | ||
128 | QByteArray mResourceType; | ||
129 | QByteArray mResourceInstanceIdentifier; | ||
130 | Sink::Storage::Transaction &mTransaction; | ||
131 | }; | ||
132 | |||
133 | /** | ||
134 | * Synchronize and add what we don't already have to local queue | ||
135 | */ | ||
136 | class SINK_EXPORT Synchronizer | ||
137 | { | ||
138 | public: | ||
139 | Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); | ||
140 | |||
141 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback); | ||
142 | KAsync::Job<void> synchronize(); | ||
143 | |||
144 | protected: | ||
145 | ///Calls the callback to enqueue the command | ||
146 | void enqueueCommand(int commandId, const QByteArray &data); | ||
147 | |||
148 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
149 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
150 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
151 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
152 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | ||
102 | 153 | ||
103 | /** | 154 | /** |
104 | * A synchronous algorithm to remove entities that are no longer existing. | 155 | * A synchronous algorithm to remove entities that are no longer existing. |
@@ -110,7 +161,7 @@ protected: | |||
110 | * | 161 | * |
111 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. | 162 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. |
112 | */ | 163 | */ |
113 | void scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, | 164 | void scanForRemovals(const QByteArray &bufferType, |
114 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); | 165 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); |
115 | 166 | ||
116 | /** | 167 | /** |
@@ -118,22 +169,60 @@ protected: | |||
118 | * | 169 | * |
119 | * Depending on whether the entity is locally available, or has changed. | 170 | * Depending on whether the entity is locally available, or has changed. |
120 | */ | 171 | */ |
121 | void createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, | 172 | void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); |
122 | const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); | ||
123 | 173 | ||
124 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | 174 | //Read only access to main storage |
175 | EntityStore &store(); | ||
125 | 176 | ||
126 | MessageQueue mUserQueue; | 177 | //Read/Write access to sync storage |
127 | MessageQueue mSynchronizerQueue; | 178 | SyncStore &syncStore(); |
179 | |||
180 | virtual KAsync::Job<void> synchronizeWithSource() = 0; | ||
181 | |||
182 | private: | ||
183 | QSharedPointer<SyncStore> mSyncStore; | ||
184 | QSharedPointer<EntityStore> mEntityStore; | ||
185 | Sink::Storage mStorage; | ||
186 | Sink::Storage mSyncStorage; | ||
187 | QByteArray mResourceType; | ||
128 | QByteArray mResourceInstanceIdentifier; | 188 | QByteArray mResourceInstanceIdentifier; |
129 | QSharedPointer<Pipeline> mPipeline; | 189 | Sink::Storage::Transaction mTransaction; |
190 | Sink::Storage::Transaction mSyncTransaction; | ||
191 | std::function<void(int commandId, const QByteArray &data)> mEnqueue; | ||
192 | }; | ||
193 | |||
194 | /** | ||
195 | * Replay changes to the source | ||
196 | */ | ||
197 | class SINK_EXPORT SourceWriteBack : public ChangeReplay | ||
198 | { | ||
199 | public: | ||
200 | SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier); | ||
201 | |||
202 | protected: | ||
203 | ///Base implementation calls the replay$Type calls | ||
204 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | ||
205 | |||
206 | protected: | ||
207 | ///Implement to write back changes to the server | ||
208 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); | ||
209 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); | ||
210 | |||
211 | //Read only access to main storage | ||
212 | EntityStore &store(); | ||
213 | |||
214 | //Read/Write access to sync storage | ||
215 | SyncStore &syncStore(); | ||
130 | 216 | ||
131 | private: | 217 | private: |
132 | CommandProcessor *mProcessor; | 218 | Sink::Storage mSyncStorage; |
133 | ChangeReplay *mSourceChangeReplay; | 219 | QSharedPointer<SyncStore> mSyncStore; |
134 | int mError; | 220 | QSharedPointer<EntityStore> mEntityStore; |
135 | QTimer mCommitQueueTimer; | 221 | Sink::Storage::Transaction mTransaction; |
136 | qint64 mClientLowerBoundRevision; | 222 | Sink::Storage::Transaction mSyncTransaction; |
137 | QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; | 223 | QByteArray mResourceType; |
224 | QByteArray mResourceInstanceIdentifier; | ||
138 | }; | 225 | }; |
226 | |||
227 | |||
139 | } | 228 | } |