diff options
Diffstat (limited to 'common/genericresource.h')
-rw-r--r-- | common/genericresource.h | 151 |
1 files changed, 120 insertions, 31 deletions
diff --git a/common/genericresource.h b/common/genericresource.h index c551e29..45d5d3a 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -24,14 +24,16 @@ | |||
24 | #include <messagequeue.h> | 24 | #include <messagequeue.h> |
25 | #include <flatbuffers/flatbuffers.h> | 25 | #include <flatbuffers/flatbuffers.h> |
26 | #include <domainadaptor.h> | 26 | #include <domainadaptor.h> |
27 | #include "changereplay.h" | ||
28 | |||
27 | #include <QTimer> | 29 | #include <QTimer> |
28 | 30 | ||
29 | class CommandProcessor; | 31 | class CommandProcessor; |
30 | class ChangeReplay; | ||
31 | 32 | ||
32 | namespace Sink { | 33 | namespace Sink { |
33 | class Pipeline; | 34 | class Pipeline; |
34 | class Preprocessor; | 35 | class Preprocessor; |
36 | class Synchronizer; | ||
35 | 37 | ||
36 | /** | 38 | /** |
37 | * Generic Resource implementation. | 39 | * Generic Resource implementation. |
@@ -39,7 +41,7 @@ class Preprocessor; | |||
39 | class SINK_EXPORT GenericResource : public Resource | 41 | class SINK_EXPORT GenericResource : public Resource |
40 | { | 42 | { |
41 | public: | 43 | public: |
42 | GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline = QSharedPointer<Pipeline>()); | 44 | GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer); |
43 | virtual ~GenericResource(); | 45 | virtual ~GenericResource(); |
44 | 46 | ||
45 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; | 47 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; |
@@ -64,41 +66,90 @@ protected: | |||
64 | 66 | ||
65 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); | 67 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); |
66 | 68 | ||
67 | ///Base implementation call the replay$Type calls | ||
68 | virtual KAsync::Job<void> replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value); | ||
69 | ///Implement to write back changes to the server | ||
70 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); | ||
71 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); | ||
72 | |||
73 | void onProcessorError(int errorCode, const QString &errorMessage); | 69 | void onProcessorError(int errorCode, const QString &errorMessage); |
74 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | 70 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); |
75 | 71 | ||
76 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 72 | MessageQueue mUserQueue; |
77 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | 73 | MessageQueue mSynchronizerQueue; |
78 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 74 | QByteArray mResourceType; |
79 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | 75 | QByteArray mResourceInstanceIdentifier; |
80 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | 76 | QSharedPointer<Pipeline> mPipeline; |
77 | |||
78 | private: | ||
79 | CommandProcessor *mProcessor; | ||
80 | QSharedPointer<ChangeReplay> mChangeReplay; | ||
81 | QSharedPointer<Synchronizer> mSynchronizer; | ||
82 | int mError; | ||
83 | QTimer mCommitQueueTimer; | ||
84 | qint64 mClientLowerBoundRevision; | ||
85 | QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; | ||
86 | }; | ||
87 | |||
88 | class SINK_EXPORT SyncStore | ||
89 | { | ||
90 | public: | ||
91 | SyncStore(Sink::Storage::Transaction &); | ||
81 | 92 | ||
82 | /** | 93 | /** |
83 | * Records a localId to remoteId mapping | 94 | * Records a localId to remoteId mapping |
84 | */ | 95 | */ |
85 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 96 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
86 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 97 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
87 | void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 98 | void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
88 | 99 | ||
89 | /** | 100 | /** |
90 | * Tries to find a local id for the remote id, and creates a new local id otherwise. | 101 | * Tries to find a local id for the remote id, and creates a new local id otherwise. |
91 | * | 102 | * |
92 | * The new local id is recorded in the local to remote id mapping. | 103 | * The new local id is recorded in the local to remote id mapping. |
93 | */ | 104 | */ |
94 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 105 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId); |
95 | 106 | ||
96 | /** | 107 | /** |
97 | * Tries to find a remote id for a local id. | 108 | * Tries to find a remote id for a local id. |
98 | * | 109 | * |
99 | * This can fail if the entity hasn't been written back to the server yet. | 110 | * This can fail if the entity hasn't been written back to the server yet. |
100 | */ | 111 | */ |
101 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction); | 112 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); |
113 | |||
114 | private: | ||
115 | Sink::Storage::Transaction &mTransaction; | ||
116 | }; | ||
117 | |||
118 | class SINK_EXPORT EntityStore | ||
119 | { | ||
120 | public: | ||
121 | EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); | ||
122 | |||
123 | template<typename T> | ||
124 | T read(const QByteArray &identifier) const; | ||
125 | |||
126 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | ||
127 | private: | ||
128 | QByteArray mResourceType; | ||
129 | QByteArray mResourceInstanceIdentifier; | ||
130 | Sink::Storage::Transaction &mTransaction; | ||
131 | }; | ||
132 | |||
133 | /** | ||
134 | * Synchronize and add what we don't already have to local queue | ||
135 | */ | ||
136 | class SINK_EXPORT Synchronizer | ||
137 | { | ||
138 | public: | ||
139 | Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); | ||
140 | |||
141 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback); | ||
142 | KAsync::Job<void> synchronize(); | ||
143 | |||
144 | protected: | ||
145 | ///Calls the callback to enqueue the command | ||
146 | void enqueueCommand(int commandId, const QByteArray &data); | ||
147 | |||
148 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
149 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
150 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
151 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
152 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | ||
102 | 153 | ||
103 | /** | 154 | /** |
104 | * A synchronous algorithm to remove entities that are no longer existing. | 155 | * A synchronous algorithm to remove entities that are no longer existing. |
@@ -110,7 +161,7 @@ protected: | |||
110 | * | 161 | * |
111 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. | 162 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. |
112 | */ | 163 | */ |
113 | void scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, | 164 | void scanForRemovals(const QByteArray &bufferType, |
114 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); | 165 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); |
115 | 166 | ||
116 | /** | 167 | /** |
@@ -118,22 +169,60 @@ protected: | |||
118 | * | 169 | * |
119 | * Depending on whether the entity is locally available, or has changed. | 170 | * Depending on whether the entity is locally available, or has changed. |
120 | */ | 171 | */ |
121 | void createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, | 172 | void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); |
122 | const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); | ||
123 | 173 | ||
124 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | 174 | //Read only access to main storage |
175 | EntityStore &store(); | ||
125 | 176 | ||
126 | MessageQueue mUserQueue; | 177 | //Read/Write access to sync storage |
127 | MessageQueue mSynchronizerQueue; | 178 | SyncStore &syncStore(); |
179 | |||
180 | virtual KAsync::Job<void> synchronizeWithSource() = 0; | ||
181 | |||
182 | private: | ||
183 | QSharedPointer<SyncStore> mSyncStore; | ||
184 | QSharedPointer<EntityStore> mEntityStore; | ||
185 | Sink::Storage mStorage; | ||
186 | Sink::Storage mSyncStorage; | ||
187 | QByteArray mResourceType; | ||
128 | QByteArray mResourceInstanceIdentifier; | 188 | QByteArray mResourceInstanceIdentifier; |
129 | QSharedPointer<Pipeline> mPipeline; | 189 | Sink::Storage::Transaction mTransaction; |
190 | Sink::Storage::Transaction mSyncTransaction; | ||
191 | std::function<void(int commandId, const QByteArray &data)> mEnqueue; | ||
192 | }; | ||
193 | |||
194 | /** | ||
195 | * Replay changes to the source | ||
196 | */ | ||
197 | class SINK_EXPORT SourceWriteBack : public ChangeReplay | ||
198 | { | ||
199 | public: | ||
200 | SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier); | ||
201 | |||
202 | protected: | ||
203 | ///Base implementation calls the replay$Type calls | ||
204 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | ||
205 | |||
206 | protected: | ||
207 | ///Implement to write back changes to the server | ||
208 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); | ||
209 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); | ||
210 | |||
211 | //Read only access to main storage | ||
212 | EntityStore &store(); | ||
213 | |||
214 | //Read/Write access to sync storage | ||
215 | SyncStore &syncStore(); | ||
130 | 216 | ||
131 | private: | 217 | private: |
132 | CommandProcessor *mProcessor; | 218 | Sink::Storage mSyncStorage; |
133 | ChangeReplay *mSourceChangeReplay; | 219 | QSharedPointer<SyncStore> mSyncStore; |
134 | int mError; | 220 | QSharedPointer<EntityStore> mEntityStore; |
135 | QTimer mCommitQueueTimer; | 221 | Sink::Storage::Transaction mTransaction; |
136 | qint64 mClientLowerBoundRevision; | 222 | Sink::Storage::Transaction mSyncTransaction; |
137 | QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; | 223 | QByteArray mResourceType; |
224 | QByteArray mResourceInstanceIdentifier; | ||
138 | }; | 225 | }; |
226 | |||
227 | |||
139 | } | 228 | } |