diff options
author | Aaron Seigo <aseigo@kde.org> | 2014-12-17 08:27:31 +0100 |
---|---|---|
committer | Aaron Seigo <aseigo@kde.org> | 2014-12-17 08:27:31 +0100 |
commit | 1c7e8fd482bb67a5487449948488bd286a3504c1 (patch) | |
tree | 38789d22037a0b2ed7550b60fd2280fa522c086d | |
parent | 7265d88245767960f1b551bb57f8f84942b898d2 (diff) | |
download | sink-1c7e8fd482bb67a5487449948488bd286a3504c1.tar.gz sink-1c7e8fd482bb67a5487449948488bd286a3504c1.zip |
a basically-working Pipeline implementation
still a skeleton rather than a full body with flesh and blood, but
it is getting there!
-rw-r--r-- | common/pipeline.cpp | 199 | ||||
-rw-r--r-- | common/pipeline.h | 80 | ||||
-rw-r--r-- | common/resource.cpp | 10 | ||||
-rw-r--r-- | common/resource.h | 7 | ||||
-rw-r--r-- | dummyresource/resourcefactory.cpp | 25 | ||||
-rw-r--r-- | dummyresource/resourcefactory.h | 8 | ||||
-rw-r--r-- | synchronizer/listener.cpp | 23 | ||||
-rw-r--r-- | synchronizer/listener.h | 1 |
8 files changed, 315 insertions, 38 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 41def7c..5606c30 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -20,7 +20,9 @@ | |||
20 | 20 | ||
21 | #include "pipeline.h" | 21 | #include "pipeline.h" |
22 | 22 | ||
23 | #include <QByteArray> | ||
23 | #include <QStandardPaths> | 24 | #include <QStandardPaths> |
25 | #include <QVector> | ||
24 | 26 | ||
25 | namespace Akonadi2 | 27 | namespace Akonadi2 |
26 | { | 28 | { |
@@ -28,47 +30,214 @@ namespace Akonadi2 | |||
28 | class Pipeline::Private | 30 | class Pipeline::Private |
29 | { | 31 | { |
30 | public: | 32 | public: |
31 | Private(const QString &storageName) | 33 | Private(const QString &resourceName) |
32 | : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite) | 34 | : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName), |
35 | stepScheduled(false) | ||
33 | { | 36 | { |
34 | |||
35 | } | 37 | } |
36 | 38 | ||
37 | Akonadi2::Storage storage; | 39 | Storage storage; |
40 | QVector<PipelineFilter *> nullPipeline; | ||
41 | QVector<PipelineFilter *> newPipeline; | ||
42 | QVector<PipelineFilter *> modifiedPipeline; | ||
43 | QVector<PipelineFilter *> deletedPipeline; | ||
44 | QVector<PipelineState> activePipelines; | ||
45 | bool stepScheduled; | ||
38 | }; | 46 | }; |
39 | 47 | ||
40 | Pipeline::Pipeline(const QString &storageName) | 48 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) |
41 | : d(new Private(storageName)) | 49 | : QObject(parent), |
50 | d(new Private(resourceName)) | ||
42 | { | 51 | { |
43 | } | 52 | } |
44 | 53 | ||
45 | Pipeline::~Pipeline() | 54 | Pipeline::~Pipeline() |
46 | { | 55 | { |
56 | delete d; | ||
47 | } | 57 | } |
48 | 58 | ||
49 | Storage &Pipeline::storage() | 59 | Storage &Pipeline::storage() const |
50 | { | 60 | { |
51 | return d->storage; | 61 | return d->storage; |
52 | } | 62 | } |
53 | 63 | ||
54 | void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | 64 | void Pipeline::null() |
65 | { | ||
66 | //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) | ||
67 | PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); | ||
68 | d->activePipelines << state; | ||
69 | state.step(); | ||
70 | } | ||
71 | |||
72 | void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity) | ||
73 | { | ||
74 | PipelineState state(this, NewPipeline, key, d->newPipeline); | ||
75 | d->activePipelines << state; | ||
76 | state.step(); | ||
77 | } | ||
78 | |||
79 | void Pipeline::modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta) | ||
80 | { | ||
81 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline); | ||
82 | d->activePipelines << state; | ||
83 | state.step(); | ||
84 | } | ||
85 | |||
86 | void Pipeline::deletedEntity(const QByteArray &key) | ||
87 | { | ||
88 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline); | ||
89 | d->activePipelines << state; | ||
90 | state.step(); | ||
91 | } | ||
92 | |||
93 | void Pipeline::pipelineStepped(const PipelineState &state) | ||
94 | { | ||
95 | scheduleStep(); | ||
96 | } | ||
97 | |||
98 | void Pipeline::scheduleStep() | ||
99 | { | ||
100 | if (!d->stepScheduled) { | ||
101 | d->stepScheduled = true; | ||
102 | QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); | ||
103 | } | ||
104 | } | ||
105 | |||
106 | void Pipeline::stepPipelines() | ||
107 | { | ||
108 | for (PipelineState &state: d->activePipelines) { | ||
109 | if (state.isIdle()) { | ||
110 | state.step(); | ||
111 | } | ||
112 | } | ||
113 | |||
114 | d->stepScheduled = false; | ||
115 | } | ||
116 | |||
117 | void Pipeline::pipelineCompleted(const PipelineState &state) | ||
118 | { | ||
119 | //TODO finalize the datastore, inform clients of the new rev | ||
120 | const int index = d->activePipelines.indexOf(state); | ||
121 | if (index > -1) { | ||
122 | d->activePipelines.remove(index); | ||
123 | } | ||
124 | |||
125 | if (state.type() != NullPipeline) { | ||
126 | emit revisionUpdated(); | ||
127 | } | ||
128 | scheduleStep(); | ||
129 | } | ||
130 | |||
131 | |||
132 | class PipelineState::Private : public QSharedData | ||
133 | { | ||
134 | public: | ||
135 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<PipelineFilter *> filters) | ||
136 | : pipeline(p), | ||
137 | type(t), | ||
138 | key(k), | ||
139 | filterIt(filters), | ||
140 | idle(true) | ||
141 | {} | ||
142 | |||
143 | Private() | ||
144 | : pipeline(0), | ||
145 | filterIt(QVector<PipelineFilter *>()), | ||
146 | idle(true) | ||
147 | {} | ||
148 | |||
149 | Pipeline *pipeline; | ||
150 | Pipeline::Type type; | ||
151 | QByteArray key; | ||
152 | QVectorIterator<PipelineFilter *> filterIt; | ||
153 | bool idle; | ||
154 | }; | ||
155 | |||
156 | PipelineState::PipelineState() | ||
157 | : d(new Private()) | ||
158 | { | ||
159 | |||
160 | } | ||
161 | |||
162 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<PipelineFilter *> &filters) | ||
163 | : d(new Private(pipeline, type, key, filters)) | ||
164 | { | ||
165 | } | ||
166 | |||
167 | PipelineState::PipelineState(const PipelineState &other) | ||
168 | : d(other.d) | ||
169 | { | ||
170 | } | ||
171 | |||
172 | PipelineState::~PipelineState() | ||
173 | { | ||
174 | } | ||
175 | |||
176 | PipelineState &PipelineState::operator=(const PipelineState &rhs) | ||
177 | { | ||
178 | d = rhs.d; | ||
179 | return *this; | ||
180 | } | ||
181 | |||
182 | bool PipelineState::operator==(const PipelineState &rhs) | ||
183 | { | ||
184 | return d == rhs.d; | ||
185 | } | ||
186 | |||
187 | bool PipelineState::isIdle() const | ||
188 | { | ||
189 | return d->idle; | ||
190 | } | ||
191 | |||
192 | QByteArray PipelineState::key() const | ||
193 | { | ||
194 | return d->key; | ||
195 | } | ||
196 | |||
197 | Pipeline::Type PipelineState::type() const | ||
198 | { | ||
199 | return d->type; | ||
200 | } | ||
201 | |||
202 | void PipelineState::step() | ||
203 | { | ||
204 | if (!d->pipeline) { | ||
205 | return; | ||
206 | } | ||
207 | |||
208 | d->idle = false; | ||
209 | if (d->filterIt.hasNext()) { | ||
210 | d->filterIt.next()->process(*this); | ||
211 | } else { | ||
212 | d->pipeline->pipelineCompleted(*this); | ||
213 | } | ||
214 | } | ||
215 | |||
216 | void PipelineState::processingCompleted(PipelineFilter *filter) | ||
217 | { | ||
218 | if (d->pipeline && filter == d->filterIt.peekPrevious()) { | ||
219 | d->idle = true; | ||
220 | d->pipeline->pipelineStepped(*this); | ||
221 | } | ||
222 | } | ||
223 | |||
224 | PipelineFilter::PipelineFilter() | ||
225 | : d(0) | ||
55 | { | 226 | { |
56 | d->storage.write(key, keySize, buffer, bufferSize); | ||
57 | } | 227 | } |
58 | 228 | ||
59 | void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | 229 | PipelineFilter::~PipelineFilter() |
60 | { | 230 | { |
61 | d->storage.write(key, keySize, buffer, bufferSize); | ||
62 | } | 231 | } |
63 | 232 | ||
64 | void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | 233 | void PipelineFilter::process(PipelineState state) |
65 | { | 234 | { |
66 | d->storage.write(key, keySize, buffer, bufferSize); | 235 | processingCompleted(state); |
67 | } | 236 | } |
68 | 237 | ||
69 | void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | 238 | void PipelineFilter::processingCompleted(PipelineState state) |
70 | { | 239 | { |
71 | d->storage.write(key, keySize, buffer, bufferSize); | 240 | state.processingCompleted(this); |
72 | } | 241 | } |
73 | 242 | ||
74 | } // namespace Akonadi2 | 243 | } // namespace Akonadi2 |
diff --git a/common/pipeline.h b/common/pipeline.h index 635e630..6ad78df 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -20,23 +20,89 @@ | |||
20 | 20 | ||
21 | #pragma once | 21 | #pragma once |
22 | 22 | ||
23 | #include <flatbuffers/flatbuffers.h> | ||
24 | |||
25 | #include <QSharedDataPointer> | ||
26 | #include <QObject> | ||
27 | |||
23 | #include <akonadi2common_export.h> | 28 | #include <akonadi2common_export.h> |
24 | #include <storage.h> | 29 | #include <storage.h> |
25 | 30 | ||
26 | namespace Akonadi2 | 31 | namespace Akonadi2 |
27 | { | 32 | { |
28 | 33 | ||
29 | class AKONADI2COMMON_EXPORT Pipeline | 34 | class PipelineState; |
35 | class PipelineFilter; | ||
36 | |||
37 | class AKONADI2COMMON_EXPORT Pipeline : public QObject | ||
30 | { | 38 | { |
39 | Q_OBJECT | ||
40 | |||
31 | public: | 41 | public: |
32 | Pipeline(const QString &storagePath); | 42 | enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline }; |
43 | |||
44 | Pipeline(const QString &storagePath, QObject *parent = 0); | ||
33 | ~Pipeline(); | 45 | ~Pipeline(); |
34 | 46 | ||
35 | Storage &storage(); | 47 | Storage &storage() const; |
36 | void null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | 48 | |
37 | void newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | 49 | // domain objects needed here |
38 | void modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | 50 | void null(); |
39 | void deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | 51 | void newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity); |
52 | void modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta); | ||
53 | void deletedEntity(const QByteArray &key); | ||
54 | |||
55 | Q_SIGNALS: | ||
56 | void revisionUpdated(); | ||
57 | |||
58 | private Q_SLOTS: | ||
59 | void stepPipelines(); | ||
60 | |||
61 | private: | ||
62 | void pipelineStepped(const PipelineState &state); | ||
63 | void pipelineCompleted(const PipelineState &state); | ||
64 | void scheduleStep(); | ||
65 | |||
66 | friend class PipelineState; | ||
67 | |||
68 | class Private; | ||
69 | Private * const d; | ||
70 | }; | ||
71 | |||
72 | class AKONADI2COMMON_EXPORT PipelineState | ||
73 | { | ||
74 | public: | ||
75 | // domain objects? | ||
76 | PipelineState(); | ||
77 | PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<PipelineFilter *> &filters); | ||
78 | PipelineState(const PipelineState &other); | ||
79 | ~PipelineState(); | ||
80 | |||
81 | PipelineState &operator=(const PipelineState &rhs); | ||
82 | bool operator==(const PipelineState &rhs); | ||
83 | |||
84 | bool isIdle() const; | ||
85 | QByteArray key() const; | ||
86 | Pipeline::Type type() const; | ||
87 | |||
88 | void step(); | ||
89 | void processingCompleted(PipelineFilter *filter); | ||
90 | |||
91 | private: | ||
92 | class Private; | ||
93 | QExplicitlySharedDataPointer<Private> d; | ||
94 | }; | ||
95 | |||
96 | class AKONADI2COMMON_EXPORT PipelineFilter | ||
97 | { | ||
98 | public: | ||
99 | PipelineFilter(); | ||
100 | virtual ~PipelineFilter(); | ||
101 | |||
102 | virtual void process(PipelineState state); | ||
103 | |||
104 | protected: | ||
105 | void processingCompleted(PipelineState state); | ||
40 | 106 | ||
41 | private: | 107 | private: |
42 | class Private; | 108 | class Private; |
diff --git a/common/resource.cpp b/common/resource.cpp index 11a03ca..ae28485 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -39,12 +39,18 @@ Resource::~Resource() | |||
39 | //delete d; | 39 | //delete d; |
40 | } | 40 | } |
41 | 41 | ||
42 | void Resource::processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline) | 42 | void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) |
43 | { | 43 | { |
44 | Q_UNUSED(commandId) | ||
45 | Q_UNUSED(data) | ||
46 | Q_UNUSED(size) | ||
47 | Q_UNUSED(pipeline) | ||
48 | pipeline->null(); | ||
44 | } | 49 | } |
45 | 50 | ||
46 | void Resource::synchronizeWithSource() | 51 | void Resource::synchronizeWithSource(Pipeline *pipeline) |
47 | { | 52 | { |
53 | pipeline->null(); | ||
48 | } | 54 | } |
49 | 55 | ||
50 | class ResourceFactory::Private | 56 | class ResourceFactory::Private |
diff --git a/common/resource.h b/common/resource.h index 53c0bc1..0f65e1f 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -18,8 +18,8 @@ | |||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | 18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. |
19 | */ | 19 | */ |
20 | 20 | ||
21 | #include <clientapi.h> | ||
22 | #include <akonadi2common_export.h> | 21 | #include <akonadi2common_export.h> |
22 | #include <clientapi.h> | ||
23 | #include <pipeline.h> | 23 | #include <pipeline.h> |
24 | 24 | ||
25 | namespace Akonadi2 | 25 | namespace Akonadi2 |
@@ -32,9 +32,8 @@ public: | |||
32 | Resource(); | 32 | Resource(); |
33 | virtual ~Resource(); | 33 | virtual ~Resource(); |
34 | 34 | ||
35 | //TODO: this will need to be async | 35 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); |
36 | virtual void processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | 36 | virtual void synchronizeWithSource(Pipeline *pipeline); |
37 | virtual void synchronizeWithSource(); | ||
38 | 37 | ||
39 | private: | 38 | private: |
40 | class Private; | 39 | class Private; |
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 37bfdac..bd85b4f 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -19,6 +19,7 @@ | |||
19 | 19 | ||
20 | #include "resourcefactory.h" | 20 | #include "resourcefactory.h" |
21 | #include "facade.h" | 21 | #include "facade.h" |
22 | #include "dummycalendar_generated.h" | ||
22 | 23 | ||
23 | DummyResource::DummyResource() | 24 | DummyResource::DummyResource() |
24 | : Akonadi2::Resource() | 25 | : Akonadi2::Resource() |
@@ -26,9 +27,29 @@ DummyResource::DummyResource() | |||
26 | 27 | ||
27 | } | 28 | } |
28 | 29 | ||
29 | void DummyResource::synchronizeWithSource() | 30 | void DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) |
30 | { | 31 | { |
31 | // TODO populate the storage | 32 | // TODO actually populate the storage with new items |
33 | auto builder = DummyCalendar::DummyEventBuilder(m_fbb); | ||
34 | builder .add_summary(m_fbb.CreateString("summary summary!")); | ||
35 | auto buffer = builder.Finish(); | ||
36 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); | ||
37 | pipeline->newEntity("fakekey", m_fbb); | ||
38 | m_fbb.Clear(); | ||
39 | } | ||
40 | |||
41 | void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) | ||
42 | { | ||
43 | Q_UNUSED(commandId) | ||
44 | Q_UNUSED(data) | ||
45 | Q_UNUSED(size) | ||
46 | //TODO reallly process the commands :) | ||
47 | auto builder = DummyCalendar::DummyEventBuilder(m_fbb); | ||
48 | builder .add_summary(m_fbb.CreateString("summary summary!")); | ||
49 | auto buffer = builder.Finish(); | ||
50 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); | ||
51 | pipeline->newEntity("fakekey", m_fbb); | ||
52 | m_fbb.Clear(); | ||
32 | } | 53 | } |
33 | 54 | ||
34 | DummyResourceFactory::DummyResourceFactory(QObject *parent) | 55 | DummyResourceFactory::DummyResourceFactory(QObject *parent) |
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 7c40084..807a654 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h | |||
@@ -21,6 +21,8 @@ | |||
21 | 21 | ||
22 | #include "common/resource.h" | 22 | #include "common/resource.h" |
23 | 23 | ||
24 | #include <flatbuffers/flatbuffers.h> | ||
25 | |||
24 | //TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA | 26 | //TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA |
25 | #define PLUGIN_NAME "org.kde.dummy" | 27 | #define PLUGIN_NAME "org.kde.dummy" |
26 | 28 | ||
@@ -28,7 +30,11 @@ class DummyResource : public Akonadi2::Resource | |||
28 | { | 30 | { |
29 | public: | 31 | public: |
30 | DummyResource(); | 32 | DummyResource(); |
31 | void synchronizeWithSource(); | 33 | void synchronizeWithSource(Akonadi2::Pipeline *pipeline); |
34 | void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); | ||
35 | |||
36 | private: | ||
37 | flatbuffers::FlatBufferBuilder m_fbb; | ||
32 | }; | 38 | }; |
33 | 39 | ||
34 | class DummyResourceFactory : public Akonadi2::ResourceFactory | 40 | class DummyResourceFactory : public Akonadi2::ResourceFactory |
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 18442e7..328d4d6 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -38,10 +38,12 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
38 | m_revision(0), | 38 | m_revision(0), |
39 | m_resourceName(resourceName), | 39 | m_resourceName(resourceName), |
40 | m_resource(0), | 40 | m_resource(0), |
41 | m_pipeline(new Akonadi2::Pipeline(resourceName)), | 41 | m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), |
42 | m_clientBufferProcessesTimer(new QTimer(this)), | 42 | m_clientBufferProcessesTimer(new QTimer(this)), |
43 | m_messageId(0) | 43 | m_messageId(0) |
44 | { | 44 | { |
45 | connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated, | ||
46 | this, &Listener::refreshRevision); | ||
45 | connect(m_server, &QLocalServer::newConnection, | 47 | connect(m_server, &QLocalServer::newConnection, |
46 | this, &Listener::acceptConnection); | 48 | this, &Listener::acceptConnection); |
47 | Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); | 49 | Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); |
@@ -69,7 +71,6 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
69 | 71 | ||
70 | Listener::~Listener() | 72 | Listener::~Listener() |
71 | { | 73 | { |
72 | delete m_pipeline; | ||
73 | } | 74 | } |
74 | 75 | ||
75 | void Listener::setRevision(unsigned long long revision) | 76 | void Listener::setRevision(unsigned long long revision) |
@@ -219,7 +220,7 @@ bool Listener::processClientBuffer(Client &client) | |||
219 | Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); | 220 | Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); |
220 | loadResource(); | 221 | loadResource(); |
221 | if (m_resource) { | 222 | if (m_resource) { |
222 | m_resource->synchronizeWithSource(); | 223 | m_resource->synchronizeWithSource(m_pipeline); |
223 | } | 224 | } |
224 | break; | 225 | break; |
225 | } | 226 | } |
@@ -227,14 +228,14 @@ bool Listener::processClientBuffer(Client &client) | |||
227 | case Akonadi2::Commands::DeleteEntityCommand: | 228 | case Akonadi2::Commands::DeleteEntityCommand: |
228 | case Akonadi2::Commands::ModifyEntityCommand: | 229 | case Akonadi2::Commands::ModifyEntityCommand: |
229 | case Akonadi2::Commands::CreateEntityCommand: | 230 | case Akonadi2::Commands::CreateEntityCommand: |
230 | Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name)); | 231 | Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); |
231 | m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); | 232 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); |
232 | break; | 233 | break; |
233 | default: | 234 | default: |
234 | if (commandId > Akonadi2::Commands::CustomCommand) { | 235 | if (commandId > Akonadi2::Commands::CustomCommand) { |
235 | loadResource(); | 236 | loadResource(); |
236 | if (m_resource) { | 237 | if (m_resource) { |
237 | m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); | 238 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); |
238 | } | 239 | } |
239 | } else { | 240 | } else { |
240 | //TODO: handle error: we don't know wtf this command is | 241 | //TODO: handle error: we don't know wtf this command is |
@@ -243,8 +244,9 @@ bool Listener::processClientBuffer(Client &client) | |||
243 | } | 244 | } |
244 | 245 | ||
245 | //TODO: async commands == async sendCommandCompleted | 246 | //TODO: async commands == async sendCommandCompleted |
246 | sendCommandCompleted(client, messageId); | 247 | Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); |
247 | client.commandBuffer.remove(0, size); | 248 | client.commandBuffer.remove(0, size); |
249 | sendCommandCompleted(client, messageId); | ||
248 | return client.commandBuffer.size() >= headerSize; | 250 | return client.commandBuffer.size() >= headerSize; |
249 | } | 251 | } |
250 | 252 | ||
@@ -275,6 +277,13 @@ void Listener::sendCommandCompleted(Client &client, uint messageId) | |||
275 | m_fbb.Clear(); | 277 | m_fbb.Clear(); |
276 | } | 278 | } |
277 | 279 | ||
280 | void Listener::refreshRevision() | ||
281 | { | ||
282 | //TODO this should be coming out of m_pipeline->storage() | ||
283 | ++m_revision; | ||
284 | updateClientsWithRevision(); | ||
285 | } | ||
286 | |||
278 | void Listener::updateClientsWithRevision() | 287 | void Listener::updateClientsWithRevision() |
279 | { | 288 | { |
280 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); | 289 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); |
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index b294277..357ae37 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -76,6 +76,7 @@ private Q_SLOTS: | |||
76 | void checkConnections(); | 76 | void checkConnections(); |
77 | void readFromSocket(); | 77 | void readFromSocket(); |
78 | void processClientBuffers(); | 78 | void processClientBuffers(); |
79 | void refreshRevision(); | ||
79 | 80 | ||
80 | private: | 81 | private: |
81 | bool processClientBuffer(Client &client); | 82 | bool processClientBuffer(Client &client); |