summaryrefslogtreecommitdiffstats
path: root/common/genericresource.h
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.h')
-rw-r--r--common/genericresource.h151
1 files changed, 120 insertions, 31 deletions
diff --git a/common/genericresource.h b/common/genericresource.h
index c551e29..45d5d3a 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -24,14 +24,16 @@
24#include <messagequeue.h> 24#include <messagequeue.h>
25#include <flatbuffers/flatbuffers.h> 25#include <flatbuffers/flatbuffers.h>
26#include <domainadaptor.h> 26#include <domainadaptor.h>
27#include "changereplay.h"
28
27#include <QTimer> 29#include <QTimer>
28 30
29class CommandProcessor; 31class CommandProcessor;
30class ChangeReplay;
31 32
32namespace Sink { 33namespace Sink {
33class Pipeline; 34class Pipeline;
34class Preprocessor; 35class Preprocessor;
36class Synchronizer;
35 37
36/** 38/**
37 * Generic Resource implementation. 39 * Generic Resource implementation.
@@ -39,7 +41,7 @@ class Preprocessor;
39class SINK_EXPORT GenericResource : public Resource 41class SINK_EXPORT GenericResource : public Resource
40{ 42{
41public: 43public:
42 GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline = QSharedPointer<Pipeline>()); 44 GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer);
43 virtual ~GenericResource(); 45 virtual ~GenericResource();
44 46
45 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; 47 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE;
@@ -64,41 +66,90 @@ protected:
64 66
65 void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); 67 void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors);
66 68
67 ///Base implementation call the replay$Type calls
68 virtual KAsync::Job<void> replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value);
69 ///Implement to write back changes to the server
70 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId);
71 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId);
72
73 void onProcessorError(int errorCode, const QString &errorMessage); 69 void onProcessorError(int errorCode, const QString &errorMessage);
74 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); 70 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
75 71
76 static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 72 MessageQueue mUserQueue;
77 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); 73 MessageQueue mSynchronizerQueue;
78 static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 74 QByteArray mResourceType;
79 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); 75 QByteArray mResourceInstanceIdentifier;
80 static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); 76 QSharedPointer<Pipeline> mPipeline;
77
78private:
79 CommandProcessor *mProcessor;
80 QSharedPointer<ChangeReplay> mChangeReplay;
81 QSharedPointer<Synchronizer> mSynchronizer;
82 int mError;
83 QTimer mCommitQueueTimer;
84 qint64 mClientLowerBoundRevision;
85 QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories;
86};
87
88class SINK_EXPORT SyncStore
89{
90public:
91 SyncStore(Sink::Storage::Transaction &);
81 92
82 /** 93 /**
83 * Records a localId to remoteId mapping 94 * Records a localId to remoteId mapping
84 */ 95 */
85 void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 96 void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
86 void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 97 void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
87 void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 98 void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
88 99
89 /** 100 /**
90 * Tries to find a local id for the remote id, and creates a new local id otherwise. 101 * Tries to find a local id for the remote id, and creates a new local id otherwise.
91 * 102 *
92 * The new local id is recorded in the local to remote id mapping. 103 * The new local id is recorded in the local to remote id mapping.
93 */ 104 */
94 QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 105 QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId);
95 106
96 /** 107 /**
97 * Tries to find a remote id for a local id. 108 * Tries to find a remote id for a local id.
98 * 109 *
99 * This can fail if the entity hasn't been written back to the server yet. 110 * This can fail if the entity hasn't been written back to the server yet.
100 */ 111 */
101 QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction); 112 QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId);
113
114private:
115 Sink::Storage::Transaction &mTransaction;
116};
117
118class SINK_EXPORT EntityStore
119{
120public:
121 EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction);
122
123 template<typename T>
124 T read(const QByteArray &identifier) const;
125
126 static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory);
127private:
128 QByteArray mResourceType;
129 QByteArray mResourceInstanceIdentifier;
130 Sink::Storage::Transaction &mTransaction;
131};
132
133/**
134 * Synchronize and add what we don't already have to local queue
135 */
136class SINK_EXPORT Synchronizer
137{
138public:
139 Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier);
140
141 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback);
142 KAsync::Job<void> synchronize();
143
144protected:
145 ///Calls the callback to enqueue the command
146 void enqueueCommand(int commandId, const QByteArray &data);
147
148 static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
149 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback);
150 static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
151 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback);
152 static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback);
102 153
103 /** 154 /**
104 * A synchronous algorithm to remove entities that are no longer existing. 155 * A synchronous algorithm to remove entities that are no longer existing.
@@ -110,7 +161,7 @@ protected:
110 * 161 *
111 * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. 162 * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous.
112 */ 163 */
113 void scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, 164 void scanForRemovals(const QByteArray &bufferType,
114 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); 165 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists);
115 166
116 /** 167 /**
@@ -118,22 +169,60 @@ protected:
118 * 169 *
119 * Depending on whether the entity is locally available, or has changed. 170 * Depending on whether the entity is locally available, or has changed.
120 */ 171 */
121 void createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, 172 void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
122 const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
123 173
124 static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); 174 //Read only access to main storage
175 EntityStore &store();
125 176
126 MessageQueue mUserQueue; 177 //Read/Write access to sync storage
127 MessageQueue mSynchronizerQueue; 178 SyncStore &syncStore();
179
180 virtual KAsync::Job<void> synchronizeWithSource() = 0;
181
182private:
183 QSharedPointer<SyncStore> mSyncStore;
184 QSharedPointer<EntityStore> mEntityStore;
185 Sink::Storage mStorage;
186 Sink::Storage mSyncStorage;
187 QByteArray mResourceType;
128 QByteArray mResourceInstanceIdentifier; 188 QByteArray mResourceInstanceIdentifier;
129 QSharedPointer<Pipeline> mPipeline; 189 Sink::Storage::Transaction mTransaction;
190 Sink::Storage::Transaction mSyncTransaction;
191 std::function<void(int commandId, const QByteArray &data)> mEnqueue;
192};
193
194/**
195 * Replay changes to the source
196 */
197class SINK_EXPORT SourceWriteBack : public ChangeReplay
198{
199public:
200 SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier);
201
202protected:
203 ///Base implementation calls the replay$Type calls
204 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
205
206protected:
207 ///Implement to write back changes to the server
208 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId);
209 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId);
210
211 //Read only access to main storage
212 EntityStore &store();
213
214 //Read/Write access to sync storage
215 SyncStore &syncStore();
130 216
131private: 217private:
132 CommandProcessor *mProcessor; 218 Sink::Storage mSyncStorage;
133 ChangeReplay *mSourceChangeReplay; 219 QSharedPointer<SyncStore> mSyncStore;
134 int mError; 220 QSharedPointer<EntityStore> mEntityStore;
135 QTimer mCommitQueueTimer; 221 Sink::Storage::Transaction mTransaction;
136 qint64 mClientLowerBoundRevision; 222 Sink::Storage::Transaction mSyncTransaction;
137 QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; 223 QByteArray mResourceType;
224 QByteArray mResourceInstanceIdentifier;
138}; 225};
226
227
139} 228}