summaryrefslogtreecommitdiffstats
path: root/common/genericresource.h
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-28 00:24:53 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-28 00:24:53 +0200
commite9c75177590d8546ebd9425f16c4269a9c92f517 (patch)
tree8a953631e467d9df50657e22bd90954b7b71c990 /common/genericresource.h
parent8f01eb530262d1442fc4fa0782a41e052412d43b (diff)
downloadsink-e9c75177590d8546ebd9425f16c4269a9c92f517.tar.gz
sink-e9c75177590d8546ebd9425f16c4269a9c92f517.zip
Refactored the generic resource to use separate classes for
changereplay and synchronization. This cleans up the API and avoids the excessive passing around of transactions. It also provides more flexibility in eventually using different synchronization strategies for different resources.
Diffstat (limited to 'common/genericresource.h')
-rw-r--r--common/genericresource.h151
1 files changed, 120 insertions, 31 deletions
diff --git a/common/genericresource.h b/common/genericresource.h
index c551e29..45d5d3a 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -24,14 +24,16 @@
24#include <messagequeue.h> 24#include <messagequeue.h>
25#include <flatbuffers/flatbuffers.h> 25#include <flatbuffers/flatbuffers.h>
26#include <domainadaptor.h> 26#include <domainadaptor.h>
27#include "changereplay.h"
28
27#include <QTimer> 29#include <QTimer>
28 30
29class CommandProcessor; 31class CommandProcessor;
30class ChangeReplay;
31 32
32namespace Sink { 33namespace Sink {
33class Pipeline; 34class Pipeline;
34class Preprocessor; 35class Preprocessor;
36class Synchronizer;
35 37
36/** 38/**
37 * Generic Resource implementation. 39 * Generic Resource implementation.
@@ -39,7 +41,7 @@ class Preprocessor;
39class SINK_EXPORT GenericResource : public Resource 41class SINK_EXPORT GenericResource : public Resource
40{ 42{
41public: 43public:
42 GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline = QSharedPointer<Pipeline>()); 44 GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer);
43 virtual ~GenericResource(); 45 virtual ~GenericResource();
44 46
45 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; 47 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE;
@@ -64,41 +66,90 @@ protected:
64 66
65 void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); 67 void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors);
66 68
67 ///Base implementation call the replay$Type calls
68 virtual KAsync::Job<void> replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value);
69 ///Implement to write back changes to the server
70 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId);
71 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId);
72
73 void onProcessorError(int errorCode, const QString &errorMessage); 69 void onProcessorError(int errorCode, const QString &errorMessage);
74 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); 70 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
75 71
76 static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 72 MessageQueue mUserQueue;
77 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); 73 MessageQueue mSynchronizerQueue;
78 static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 74 QByteArray mResourceType;
79 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); 75 QByteArray mResourceInstanceIdentifier;
80 static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); 76 QSharedPointer<Pipeline> mPipeline;
77
78private:
79 CommandProcessor *mProcessor;
80 QSharedPointer<ChangeReplay> mChangeReplay;
81 QSharedPointer<Synchronizer> mSynchronizer;
82 int mError;
83 QTimer mCommitQueueTimer;
84 qint64 mClientLowerBoundRevision;
85 QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories;
86};
87
88class SINK_EXPORT SyncStore
89{
90public:
91 SyncStore(Sink::Storage::Transaction &);
81 92
82 /** 93 /**
83 * Records a localId to remoteId mapping 94 * Records a localId to remoteId mapping
84 */ 95 */
85 void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 96 void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
86 void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 97 void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
87 void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 98 void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
88 99
89 /** 100 /**
90 * Tries to find a local id for the remote id, and creates a new local id otherwise. 101 * Tries to find a local id for the remote id, and creates a new local id otherwise.
91 * 102 *
92 * The new local id is recorded in the local to remote id mapping. 103 * The new local id is recorded in the local to remote id mapping.
93 */ 104 */
94 QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 105 QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId);
95 106
96 /** 107 /**
97 * Tries to find a remote id for a local id. 108 * Tries to find a remote id for a local id.
98 * 109 *
99 * This can fail if the entity hasn't been written back to the server yet. 110 * This can fail if the entity hasn't been written back to the server yet.
100 */ 111 */
101 QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction); 112 QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId);
113
114private:
115 Sink::Storage::Transaction &mTransaction;
116};
117
118class SINK_EXPORT EntityStore
119{
120public:
121 EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction);
122
123 template<typename T>
124 T read(const QByteArray &identifier) const;
125
126 static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory);
127private:
128 QByteArray mResourceType;
129 QByteArray mResourceInstanceIdentifier;
130 Sink::Storage::Transaction &mTransaction;
131};
132
133/**
134 * Synchronize and add what we don't already have to local queue
135 */
136class SINK_EXPORT Synchronizer
137{
138public:
139 Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier);
140
141 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback);
142 KAsync::Job<void> synchronize();
143
144protected:
145 ///Calls the callback to enqueue the command
146 void enqueueCommand(int commandId, const QByteArray &data);
147
148 static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
149 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback);
150 static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
151 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback);
152 static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback);
102 153
103 /** 154 /**
104 * A synchronous algorithm to remove entities that are no longer existing. 155 * A synchronous algorithm to remove entities that are no longer existing.
@@ -110,7 +161,7 @@ protected:
110 * 161 *
111 * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. 162 * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous.
112 */ 163 */
113 void scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, 164 void scanForRemovals(const QByteArray &bufferType,
114 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); 165 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists);
115 166
116 /** 167 /**
@@ -118,22 +169,60 @@ protected:
118 * 169 *
119 * Depending on whether the entity is locally available, or has changed. 170 * Depending on whether the entity is locally available, or has changed.
120 */ 171 */
121 void createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, 172 void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
122 const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
123 173
124 static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); 174 //Read only access to main storage
175 EntityStore &store();
125 176
126 MessageQueue mUserQueue; 177 //Read/Write access to sync storage
127 MessageQueue mSynchronizerQueue; 178 SyncStore &syncStore();
179
180 virtual KAsync::Job<void> synchronizeWithSource() = 0;
181
182private:
183 QSharedPointer<SyncStore> mSyncStore;
184 QSharedPointer<EntityStore> mEntityStore;
185 Sink::Storage mStorage;
186 Sink::Storage mSyncStorage;
187 QByteArray mResourceType;
128 QByteArray mResourceInstanceIdentifier; 188 QByteArray mResourceInstanceIdentifier;
129 QSharedPointer<Pipeline> mPipeline; 189 Sink::Storage::Transaction mTransaction;
190 Sink::Storage::Transaction mSyncTransaction;
191 std::function<void(int commandId, const QByteArray &data)> mEnqueue;
192};
193
194/**
195 * Replay changes to the source
196 */
197class SINK_EXPORT SourceWriteBack : public ChangeReplay
198{
199public:
200 SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier);
201
202protected:
203 ///Base implementation calls the replay$Type calls
204 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
205
206protected:
207 ///Implement to write back changes to the server
208 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId);
209 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId);
210
211 //Read only access to main storage
212 EntityStore &store();
213
214 //Read/Write access to sync storage
215 SyncStore &syncStore();
130 216
131private: 217private:
132 CommandProcessor *mProcessor; 218 Sink::Storage mSyncStorage;
133 ChangeReplay *mSourceChangeReplay; 219 QSharedPointer<SyncStore> mSyncStore;
134 int mError; 220 QSharedPointer<EntityStore> mEntityStore;
135 QTimer mCommitQueueTimer; 221 Sink::Storage::Transaction mTransaction;
136 qint64 mClientLowerBoundRevision; 222 Sink::Storage::Transaction mSyncTransaction;
137 QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; 223 QByteArray mResourceType;
224 QByteArray mResourceInstanceIdentifier;
138}; 225};
226
227
139} 228}