diff options
-rw-r--r-- | common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | common/clientapi.h | 3 | ||||
-rw-r--r-- | common/commands/synchronize.fbs | 8 | ||||
-rw-r--r-- | common/resource.cpp | 5 | ||||
-rw-r--r-- | common/resource.h | 1 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 8 | ||||
-rw-r--r-- | common/resourceaccess.h | 2 | ||||
-rw-r--r-- | dummyresource/facade.cpp | 8 | ||||
-rw-r--r-- | dummyresource/facade.h | 2 | ||||
-rw-r--r-- | dummyresource/resourcefactory.cpp | 7 | ||||
-rw-r--r-- | dummyresource/resourcefactory.h | 1 | ||||
-rw-r--r-- | synchronizer/listener.cpp | 41 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 6 |
13 files changed, 71 insertions, 22 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 3d3a2b7..18bcad0 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -7,6 +7,7 @@ generate_flatbuffers( | |||
7 | commands/handshake | 7 | commands/handshake |
8 | commands/modifyentity | 8 | commands/modifyentity |
9 | commands/revisionupdate | 9 | commands/revisionupdate |
10 | commands/synchronize | ||
10 | domain/event | 11 | domain/event |
11 | entity | 12 | entity |
12 | metadata | 13 | metadata |
diff --git a/common/clientapi.h b/common/clientapi.h index 659ae91..55fbed1 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -297,7 +297,7 @@ using namespace async; | |||
297 | class Query | 297 | class Query |
298 | { | 298 | { |
299 | public: | 299 | public: |
300 | Query() : syncOnDemand(true) {} | 300 | Query() : syncOnDemand(true), processAll(false) {} |
301 | //Could also be a propertyFilter | 301 | //Could also be a propertyFilter |
302 | QStringList resources; | 302 | QStringList resources; |
303 | //Could also be a propertyFilter | 303 | //Could also be a propertyFilter |
@@ -307,6 +307,7 @@ public: | |||
307 | //Properties to retrieve | 307 | //Properties to retrieve |
308 | QSet<QString> requestedProperties; | 308 | QSet<QString> requestedProperties; |
309 | bool syncOnDemand; | 309 | bool syncOnDemand; |
310 | bool processAll; | ||
310 | }; | 311 | }; |
311 | 312 | ||
312 | 313 | ||
diff --git a/common/commands/synchronize.fbs b/common/commands/synchronize.fbs new file mode 100644 index 0000000..d2d0364 --- /dev/null +++ b/common/commands/synchronize.fbs | |||
@@ -0,0 +1,8 @@ | |||
1 | namespace Akonadi2; | ||
2 | |||
3 | table Synchronize { | ||
4 | sourceSync: bool; //Synchronize with source | ||
5 | localSync: bool; //Ensure all queues are processed so the local state is up-to date. | ||
6 | } | ||
7 | |||
8 | root_type Synchronize; | ||
diff --git a/common/resource.cpp b/common/resource.cpp index db08c4f..e158a40 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -60,6 +60,11 @@ Async::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline) | |||
60 | }); | 60 | }); |
61 | } | 61 | } |
62 | 62 | ||
63 | Async::Job<void> Resource::processAllMessages() | ||
64 | { | ||
65 | return Async::null<void>(); | ||
66 | } | ||
67 | |||
63 | class ResourceFactory::Private | 68 | class ResourceFactory::Private |
64 | { | 69 | { |
65 | public: | 70 | public: |
diff --git a/common/resource.h b/common/resource.h index 52a28a6..bcce229 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -35,6 +35,7 @@ public: | |||
35 | 35 | ||
36 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | 36 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); |
37 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); | 37 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); |
38 | virtual Async::Job<void> processAllMessages(); | ||
38 | 39 | ||
39 | virtual void configurePipeline(Pipeline *pipeline); | 40 | virtual void configurePipeline(Pipeline *pipeline); |
40 | 41 | ||
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 73a01ca..5d067c5 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -25,6 +25,7 @@ | |||
25 | #include "common/commandcompletion_generated.h" | 25 | #include "common/commandcompletion_generated.h" |
26 | #include "common/handshake_generated.h" | 26 | #include "common/handshake_generated.h" |
27 | #include "common/revisionupdate_generated.h" | 27 | #include "common/revisionupdate_generated.h" |
28 | #include "common/synchronize_generated.h" | ||
28 | 29 | ||
29 | #include <QCoreApplication> | 30 | #include <QCoreApplication> |
30 | #include <QDebug> | 31 | #include <QDebug> |
@@ -186,9 +187,12 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu | |||
186 | }); | 187 | }); |
187 | } | 188 | } |
188 | 189 | ||
189 | Async::Job<void> ResourceAccess::synchronizeResource() | 190 | Async::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) |
190 | { | 191 | { |
191 | return sendCommand(Commands::SynchronizeCommand); | 192 | auto command = Akonadi2::CreateSynchronize(d->fbb, sourceSync, localSync); |
193 | Akonadi2::FinishSynchronizeBuffer(d->fbb, command); | ||
194 | return sendCommand(Commands::SynchronizeCommand, d->fbb); | ||
195 | d->fbb.Clear(); | ||
192 | } | 196 | } |
193 | 197 | ||
194 | void ResourceAccess::open() | 198 | void ResourceAccess::open() |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index a9e8c47..fbdd992 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -43,7 +43,7 @@ public: | |||
43 | 43 | ||
44 | Async::Job<void> sendCommand(int commandId); | 44 | Async::Job<void> sendCommand(int commandId); |
45 | Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); | 45 | Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); |
46 | Async::Job<void> synchronizeResource(); | 46 | Async::Job<void> synchronizeResource(bool remoteSync, bool localSync); |
47 | 47 | ||
48 | public Q_SLOTS: | 48 | public Q_SLOTS: |
49 | void open(); | 49 | void open(); |
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index 1aaec68..fcce871 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp | |||
@@ -115,16 +115,16 @@ static std::function<bool(const std::string &key, DummyEvent const *buffer, Akon | |||
115 | return preparedQuery; | 115 | return preparedQuery; |
116 | } | 116 | } |
117 | 117 | ||
118 | Async::Job<void> DummyResourceFacade::synchronizeResource(bool sync) | 118 | Async::Job<void> DummyResourceFacade::synchronizeResource(bool sync, bool processAll) |
119 | { | 119 | { |
120 | //TODO check if a sync is necessary | 120 | //TODO check if a sync is necessary |
121 | //TODO Only sync what was requested | 121 | //TODO Only sync what was requested |
122 | //TODO timeout | 122 | //TODO timeout |
123 | 123 | ||
124 | if (sync) { | 124 | if (sync || processAll) { |
125 | return Async::start<void>([=](Async::Future<void> &future) { | 125 | return Async::start<void>([=](Async::Future<void> &future) { |
126 | mResourceAccess->open(); | 126 | mResourceAccess->open(); |
127 | mResourceAccess->synchronizeResource().then<void>([&future](Async::Future<void> &f) { | 127 | mResourceAccess->synchronizeResource(sync, processAll).then<void>([&future](Async::Future<void> &f) { |
128 | future.setFinished(); | 128 | future.setFinished(); |
129 | f.setFinished(); | 129 | f.setFinished(); |
130 | }).exec(); | 130 | }).exec(); |
@@ -195,7 +195,7 @@ void DummyResourceFacade::readValue(QSharedPointer<Akonadi2::Storage> storage, c | |||
195 | 195 | ||
196 | Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback) | 196 | Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback) |
197 | { | 197 | { |
198 | return synchronizeResource(query.syncOnDemand).then<void>([=](Async::Future<void> &future) { | 198 | return synchronizeResource(query.syncOnDemand, query.processAll).then<void>([=](Async::Future<void> &future) { |
199 | //Now that the sync is complete we can execute the query | 199 | //Now that the sync is complete we can execute the query |
200 | const auto preparedQuery = prepareQuery(query); | 200 | const auto preparedQuery = prepareQuery(query); |
201 | 201 | ||
diff --git a/dummyresource/facade.h b/dummyresource/facade.h index c9c8047..1f69161 100644 --- a/dummyresource/facade.h +++ b/dummyresource/facade.h | |||
@@ -44,7 +44,7 @@ public: | |||
44 | 44 | ||
45 | private: | 45 | private: |
46 | void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::Domain::Buffer::Event const *local)>); | 46 | void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::Domain::Buffer::Event const *local)>); |
47 | Async::Job<void> synchronizeResource(bool sync); | 47 | Async::Job<void> synchronizeResource(bool sync, bool processAll); |
48 | QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess; | 48 | QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess; |
49 | QSharedPointer<DomainTypeAdaptorFactory<Akonadi2::Domain::Event, Akonadi2::Domain::Buffer::Event, DummyCalendar::DummyEvent> > mFactory; | 49 | QSharedPointer<DomainTypeAdaptorFactory<Akonadi2::Domain::Event, Akonadi2::Domain::Buffer::Event, DummyCalendar::DummyEvent> > mFactory; |
50 | }; | 50 | }; |
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 18083cb..f510cd5 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -387,7 +387,14 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli | |||
387 | } | 387 | } |
388 | } | 388 | } |
389 | //TODO find items to remove | 389 | //TODO find items to remove |
390 | qDebug() << "sync complete"; | ||
391 | f.setFinished(); | ||
392 | }); | ||
393 | } | ||
390 | 394 | ||
395 | Async::Job<void> DummyResource::processAllMessages() | ||
396 | { | ||
397 | return Async::start<void>([this](Async::Future<void> &f) { | ||
391 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | 398 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. |
392 | //TODO: report errors while processing sync? | 399 | //TODO: report errors while processing sync? |
393 | if (mSynchronizerQueue.isEmpty()) { | 400 | if (mSynchronizerQueue.isEmpty()) { |
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 767eba1..3b99d5e 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h | |||
@@ -35,6 +35,7 @@ class DummyResource : public Akonadi2::Resource | |||
35 | public: | 35 | public: |
36 | DummyResource(); | 36 | DummyResource(); |
37 | Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline); | 37 | Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline); |
38 | Async::Job<void> processAllMessages(); | ||
38 | void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); | 39 | void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); |
39 | void configurePipeline(Akonadi2::Pipeline *pipeline); | 40 | void configurePipeline(Akonadi2::Pipeline *pipeline); |
40 | int error() const; | 41 | int error() const; |
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index a84623d..6098856 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -28,6 +28,7 @@ | |||
28 | #include "common/commandcompletion_generated.h" | 28 | #include "common/commandcompletion_generated.h" |
29 | #include "common/handshake_generated.h" | 29 | #include "common/handshake_generated.h" |
30 | #include "common/revisionupdate_generated.h" | 30 | #include "common/revisionupdate_generated.h" |
31 | #include "common/synchronize_generated.h" | ||
31 | 32 | ||
32 | #include <QLocalSocket> | 33 | #include <QLocalSocket> |
33 | #include <QTimer> | 34 | #include <QTimer> |
@@ -189,18 +190,38 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
189 | break; | 190 | break; |
190 | } | 191 | } |
191 | case Akonadi2::Commands::SynchronizeCommand: { | 192 | case Akonadi2::Commands::SynchronizeCommand: { |
192 | log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); | 193 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); |
193 | loadResource(); | 194 | if (Akonadi2::VerifySynchronizeBuffer(verifier)) { |
194 | if (m_resource) { | 195 | auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData()); |
195 | qDebug() << "synchronizing"; | 196 | log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); |
196 | m_resource->synchronizeWithSource(m_pipeline).then<void>([callback](Async::Future<void> &f){ | 197 | loadResource(); |
197 | //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result | 198 | if (!m_resource) { |
198 | callback(); | 199 | qWarning() << "No resource loaded"; |
199 | f.setFinished(); | 200 | break; |
200 | }).exec(); | 201 | } |
202 | //TODO a more elegant composition of jobs should be possible | ||
203 | if (buffer->sourceSync()) { | ||
204 | bool localSync = buffer->localSync(); | ||
205 | m_resource->synchronizeWithSource(m_pipeline).then<void>([callback, localSync, this](Async::Future<void> &f){ | ||
206 | if (localSync) { | ||
207 | m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){ | ||
208 | callback(); | ||
209 | f.setFinished(); | ||
210 | }).exec(); | ||
211 | } else { | ||
212 | callback(); | ||
213 | f.setFinished(); | ||
214 | } | ||
215 | }).exec(); | ||
216 | } else if (buffer->localSync()) { | ||
217 | m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){ | ||
218 | callback(); | ||
219 | f.setFinished(); | ||
220 | }).exec(); | ||
221 | } | ||
201 | return; | 222 | return; |
202 | } else { | 223 | } else { |
203 | qWarning() << "No resource loaded"; | 224 | qWarning() << "received invalid command"; |
204 | } | 225 | } |
205 | break; | 226 | break; |
206 | } | 227 | } |
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 4e9d0a5..5508452 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -108,12 +108,10 @@ private Q_SLOTS: | |||
108 | event.setProperty("summary", "summaryValue"); | 108 | event.setProperty("summary", "summaryValue"); |
109 | Akonadi2::Store::create<Akonadi2::Domain::Event>(event, "org.kde.dummy"); | 109 | Akonadi2::Store::create<Akonadi2::Domain::Event>(event, "org.kde.dummy"); |
110 | 110 | ||
111 | //TODO required to ensure all messages have been processed. The query should ensure this. | ||
112 | QTest::qWait(300); | ||
113 | |||
114 | Akonadi2::Query query; | 111 | Akonadi2::Query query; |
115 | query.resources << "org.kde.dummy"; | 112 | query.resources << "org.kde.dummy"; |
116 | query.syncOnDemand = false; | 113 | query.syncOnDemand = false; |
114 | query.processAll = false; | ||
117 | 115 | ||
118 | query.propertyFilter.insert("uid", "testuid"); | 116 | query.propertyFilter.insert("uid", "testuid"); |
119 | async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); | 117 | async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); |
@@ -141,6 +139,8 @@ private Q_SLOTS: | |||
141 | { | 139 | { |
142 | Akonadi2::Query query; | 140 | Akonadi2::Query query; |
143 | query.resources << "org.kde.dummy"; | 141 | query.resources << "org.kde.dummy"; |
142 | query.syncOnDemand = true; | ||
143 | query.processAll = true; | ||
144 | 144 | ||
145 | async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); | 145 | async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); |
146 | result.exec(); | 146 | result.exec(); |