1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
/*
* Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) version 3, or any
* later version accepted by the membership of KDE e.V. (or its
* successor approved by the membership of KDE e.V.), which shall
* act as a proxy defined in Section 6 of version 3 of the license.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "sink_export.h"
#include <QObject>
#include <QStack>
#include <KAsync/Async>
#include <domainadaptor.h>
#include <query.h>
#include <messagequeue.h>
#include <storage.h>
#include <storage/entitystore.h>
#include "changereplay.h"
#include "synchronizerstore.h"
namespace Sink {
class SynchronizerStore;
/**
* Synchronize and add what we don't already have to local queue
*/
class SINK_EXPORT Synchronizer : public ChangeReplay
{
Q_OBJECT
public:
Synchronizer(const Sink::ResourceContext &resourceContext);
virtual ~Synchronizer();
void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue);
void synchronize(const Sink::QueryBase &query);
void flush(int commandId, const QByteArray &flushId);
//Read only access to main storage
Storage::EntityStore &store();
//Read/Write access to sync storage
SynchronizerStore &syncStore();
void commit();
Sink::Storage::DataStore::Transaction &syncTransaction();
bool allChangesReplayed() Q_DECL_OVERRIDE;
void flushComplete(const QByteArray &flushId);
signals:
void notify(Notification);
public slots:
virtual void revisionChanged() Q_DECL_OVERRIDE;
protected:
///Base implementation calls the replay$Type calls
KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
protected:
///Implement to write back changes to the server
virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Contact &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
protected:
///Calls the callback to enqueue the command
void enqueueCommand(int commandId, const QByteArray &data);
void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject);
void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, const QByteArray &newResource = QByteArray(), bool remove = false);
void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType);
/**
* A synchronous algorithm to remove entities that are no longer existing.
*
* A list of entities is generated by @param entryGenerator.
* The entiry Generator typically iterates over an index to produce all existing entries.
* This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false,
* an entity delete command is enqueued.
*
* All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous.
*/
void scanForRemovals(const QByteArray &bufferType,
const std::function<void(const std::function<void(const QByteArray &sinkId)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists);
void scanForRemovals(const QByteArray &bufferType, std::function<bool(const QByteArray &remoteId)> exists);
/**
* An algorithm to create or modify the entity.
*
* Depending on whether the entity is locally available, or has changed.
*/
void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
template <typename DomainType>
void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria);
void modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
// template <typename DomainType>
// void create(const DomainType &entity);
template <typename DomainType>
void modify(const DomainType &entity, const QByteArray &newResource = QByteArray(), bool remove = false);
// template <typename DomainType>
// void remove(const DomainType &entity);
QByteArrayList resolveFilter(const QueryBase::Comparator &filter);
virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0;
struct SyncRequest {
enum RequestType {
Synchronization,
ChangeReplay,
Flush
};
enum RequestOptions {
NoOptions,
RequestFlush
};
SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray(), RequestOptions o = NoOptions)
: requestId(requestId_),
requestType(Synchronization),
options(o),
query(q),
applicableEntities(q.ids())
{
}
SyncRequest(RequestType type)
: requestType(type)
{
}
SyncRequest(RequestType type, int flushType_, const QByteArray &requestId_)
: flushType(flushType_),
requestId(requestId_),
requestType(type)
{
}
int flushType = 0;
QByteArray requestId;
RequestType requestType;
RequestOptions options = NoOptions;
Sink::QueryBase query;
QByteArrayList applicableEntities;
};
/**
* This allows the synchronizer to turn a single query into multiple synchronization requests.
*
* The idea is the following;
* The input query is a specification by the application of what data needs to be made available.
* Requests could be:
* * Give me everything (signified by the default constructed/empty query)
* * Give me all mails of folder X
* * Give me all mails of folders matching some constraints
*
* getSyncRequests allows the resource implementation to apply it's own defaults to that request;
* * While a maildir resource might give you always all emails of a folder, an IMAP resource might have a date limit, to i.e. only retrieve the last 14 days worth of data.
* * A resource get's to define what "give me everything" means. For email that may be turned into first a requests for folders, and then a request for all emails in those folders.
*
* This will allow synchronizeWithSource to focus on just getting to the content.
*/
virtual QList<Synchronizer::SyncRequest> getSyncRequests(const Sink::QueryBase &query);
/**
* This allows the synchronizer to merge new requests with existing requests in the queue.
*/
virtual void mergeIntoQueue(const Synchronizer::SyncRequest &request, QList<Synchronizer::SyncRequest> &queue);
void emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id = QByteArray{}, const QByteArrayList &entiteis = QByteArrayList{});
/**
* Report progress for current task
*/
void reportProgress(int progress, int total);
protected:
Sink::Log::Context mLogCtx;
private:
QStack<ApplicationDomain::Status> mCurrentState;
void setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId);
void setStatus(ApplicationDomain::Status busy, const QString &reason, const QByteArray requestId);
void resetStatus(const QByteArray requestId);
void setBusy(bool busy, const QString &reason, const QByteArray requestId);
void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
KAsync::Job<void> processRequest(const SyncRequest &request);
KAsync::Job<void> processSyncQueue();
Sink::ResourceContext mResourceContext;
Sink::Storage::EntityStore::Ptr mEntityStore;
QSharedPointer<SynchronizerStore> mSyncStore;
Sink::Storage::DataStore mSyncStorage;
Sink::Storage::DataStore::Transaction mSyncTransaction;
std::function<void(int commandId, const QByteArray &data)> mEnqueue;
QList<SyncRequest> mSyncRequestQueue;
MessageQueue *mMessageQueue;
bool mSyncInProgress;
QMultiHash<QByteArray, SyncRequest> mPendingSyncRequests;
};
}
|