diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-01-25 11:23:08 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-01-25 11:23:08 +0100 |
commit | 3fc8ce958fc244e64a3a3a92f3b1440aae04133b (patch) | |
tree | 4ba2b3ca3ee6a17e7f3e7ce67d6ca934626cad7a | |
parent | 9b744da32e64d8a6cd342faba8fc3232884d60f2 (diff) | |
download | sink-3fc8ce958fc244e64a3a3a92f3b1440aae04133b.tar.gz sink-3fc8ce958fc244e64a3a3a92f3b1440aae04133b.zip |
A way to ensure all messages have been processed.
As queries become reactive this should become less important. We can then just wait until
all results become available. For tests it is in either case useful though.
-rw-r--r-- | common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | common/clientapi.h | 3 | ||||
-rw-r--r-- | common/commands/synchronize.fbs | 8 | ||||
-rw-r--r-- | common/resource.cpp | 5 | ||||
-rw-r--r-- | common/resource.h | 1 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 8 | ||||
-rw-r--r-- | common/resourceaccess.h | 2 | ||||
-rw-r--r-- | dummyresource/facade.cpp | 8 | ||||
-rw-r--r-- | dummyresource/facade.h | 2 | ||||
-rw-r--r-- | dummyresource/resourcefactory.cpp | 7 | ||||
-rw-r--r-- | dummyresource/resourcefactory.h | 1 | ||||
-rw-r--r-- | synchronizer/listener.cpp | 41 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 6 |
13 files changed, 71 insertions, 22 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 3d3a2b7..18bcad0 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -7,6 +7,7 @@ generate_flatbuffers( | |||
7 | commands/handshake | 7 | commands/handshake |
8 | commands/modifyentity | 8 | commands/modifyentity |
9 | commands/revisionupdate | 9 | commands/revisionupdate |
10 | commands/synchronize | ||
10 | domain/event | 11 | domain/event |
11 | entity | 12 | entity |
12 | metadata | 13 | metadata |
diff --git a/common/clientapi.h b/common/clientapi.h index 659ae91..55fbed1 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -297,7 +297,7 @@ using namespace async; | |||
297 | class Query | 297 | class Query |
298 | { | 298 | { |
299 | public: | 299 | public: |
300 | Query() : syncOnDemand(true) {} | 300 | Query() : syncOnDemand(true), processAll(false) {} |
301 | //Could also be a propertyFilter | 301 | //Could also be a propertyFilter |
302 | QStringList resources; | 302 | QStringList resources; |
303 | //Could also be a propertyFilter | 303 | //Could also be a propertyFilter |
@@ -307,6 +307,7 @@ public: | |||
307 | //Properties to retrieve | 307 | //Properties to retrieve |
308 | QSet<QString> requestedProperties; | 308 | QSet<QString> requestedProperties; |
309 | bool syncOnDemand; | 309 | bool syncOnDemand; |
310 | bool processAll; | ||
310 | }; | 311 | }; |
311 | 312 | ||
312 | 313 | ||
diff --git a/common/commands/synchronize.fbs b/common/commands/synchronize.fbs new file mode 100644 index 0000000..d2d0364 --- /dev/null +++ b/common/commands/synchronize.fbs | |||
@@ -0,0 +1,8 @@ | |||
1 | namespace Akonadi2; | ||
2 | |||
3 | table Synchronize { | ||
4 | sourceSync: bool; //Synchronize with source | ||
5 | localSync: bool; //Ensure all queues are processed so the local state is up-to date. | ||
6 | } | ||
7 | |||
8 | root_type Synchronize; | ||
diff --git a/common/resource.cpp b/common/resource.cpp index db08c4f..e158a40 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -60,6 +60,11 @@ Async::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline) | |||
60 | }); | 60 | }); |
61 | } | 61 | } |
62 | 62 | ||
63 | Async::Job<void> Resource::processAllMessages() | ||
64 | { | ||
65 | return Async::null<void>(); | ||
66 | } | ||
67 | |||
63 | class ResourceFactory::Private | 68 | class ResourceFactory::Private |
64 | { | 69 | { |
65 | public: | 70 | public: |
diff --git a/common/resource.h b/common/resource.h index 52a28a6..bcce229 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -35,6 +35,7 @@ public: | |||
35 | 35 | ||
36 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | 36 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); |
37 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); | 37 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); |
38 | virtual Async::Job<void> processAllMessages(); | ||
38 | 39 | ||
39 | virtual void configurePipeline(Pipeline *pipeline); | 40 | virtual void configurePipeline(Pipeline *pipeline); |
40 | 41 | ||
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 73a01ca..5d067c5 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -25,6 +25,7 @@ | |||
25 | #include "common/commandcompletion_generated.h" | 25 | #include "common/commandcompletion_generated.h" |
26 | #include "common/handshake_generated.h" | 26 | #include "common/handshake_generated.h" |
27 | #include "common/revisionupdate_generated.h" | 27 | #include "common/revisionupdate_generated.h" |
28 | #include "common/synchronize_generated.h" | ||
28 | 29 | ||
29 | #include <QCoreApplication> | 30 | #include <QCoreApplication> |
30 | #include <QDebug> | 31 | #include <QDebug> |
@@ -186,9 +187,12 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu | |||
186 | }); | 187 | }); |
187 | } | 188 | } |
188 | 189 | ||
189 | Async::Job<void> ResourceAccess::synchronizeResource() | 190 | Async::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) |
190 | { | 191 | { |
191 | return sendCommand(Commands::SynchronizeCommand); | 192 | auto command = Akonadi2::CreateSynchronize(d->fbb, sourceSync, localSync); |
193 | Akonadi2::FinishSynchronizeBuffer(d->fbb, command); | ||
194 | return sendCommand(Commands::SynchronizeCommand, d->fbb); | ||
195 | d->fbb.Clear(); | ||
192 | } | 196 | } |
193 | 197 | ||
194 | void ResourceAccess::open() | 198 | void ResourceAccess::open() |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index a9e8c47..fbdd992 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -43,7 +43,7 @@ public: | |||
43 | 43 | ||
44 | Async::Job<void> sendCommand(int commandId); | 44 | Async::Job<void> sendCommand(int commandId); |
45 | Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); | 45 | Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); |
46 | Async::Job<void> synchronizeResource(); | 46 | Async::Job<void> synchronizeResource(bool remoteSync, bool localSync); |
47 | 47 | ||
48 | public Q_SLOTS: | 48 | public Q_SLOTS: |
49 | void open(); | 49 | void open(); |
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index 1aaec68..fcce871 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp | |||
@@ -115,16 +115,16 @@ static std::function<bool(const std::string &key, DummyEvent const *buffer, Akon | |||
115 | return preparedQuery; | 115 | return preparedQuery; |
116 | } | 116 | } |
117 | 117 | ||
118 | Async::Job<void> DummyResourceFacade::synchronizeResource(bool sync) | 118 | Async::Job<void> DummyResourceFacade::synchronizeResource(bool sync, bool processAll) |
119 | { | 119 | { |
120 | //TODO check if a sync is necessary | 120 | //TODO check if a sync is necessary |
121 | //TODO Only sync what was requested | 121 | //TODO Only sync what was requested |
122 | //TODO timeout | 122 | //TODO timeout |
123 | 123 | ||
124 | if (sync) { | 124 | if (sync || processAll) { |
125 | return Async::start<void>([=](Async::Future<void> &future) { | 125 | return Async::start<void>([=](Async::Future<void> &future) { |
126 | mResourceAccess->open(); | 126 | mResourceAccess->open(); |
127 | mResourceAccess->synchronizeResource().then<void>([&future](Async::Future<void> &f) { | 127 | mResourceAccess->synchronizeResource(sync, processAll).then<void>([&future](Async::Future<void> &f) { |
128 | future.setFinished(); | 128 | future.setFinished(); |
129 | f.setFinished(); | 129 | f.setFinished(); |
130 | }).exec(); | 130 | }).exec(); |
@@ -195,7 +195,7 @@ void DummyResourceFacade::readValue(QSharedPointer<Akonadi2::Storage> storage, c | |||
195 | 195 | ||
196 | Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback) | 196 | Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback) |
197 | { | 197 | { |
198 | return synchronizeResource(query.syncOnDemand).then<void>([=](Async::Future<void> &future) { | 198 | return synchronizeResource(query.syncOnDemand, query.processAll).then<void>([=](Async::Future<void> &future) { |
199 | //Now that the sync is complete we can execute the query | 199 | //Now that the sync is complete we can execute the query |
200 | const auto preparedQuery = prepareQuery(query); | 200 | const auto preparedQuery = prepareQuery(query); |
201 | 201 | ||
diff --git a/dummyresource/facade.h b/dummyresource/facade.h index c9c8047..1f69161 100644 --- a/dummyresource/facade.h +++ b/dummyresource/facade.h | |||
@@ -44,7 +44,7 @@ public: | |||
44 | 44 | ||
45 | private: | 45 | private: |
46 | void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::Domain::Buffer::Event const *local)>); | 46 | void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::Domain::Buffer::Event const *local)>); |
47 | Async::Job<void> synchronizeResource(bool sync); | 47 | Async::Job<void> synchronizeResource(bool sync, bool processAll); |
48 | QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess; | 48 | QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess; |
49 | QSharedPointer<DomainTypeAdaptorFactory<Akonadi2::Domain::Event, Akonadi2::Domain::Buffer::Event, DummyCalendar::DummyEvent> > mFactory; | 49 | QSharedPointer<DomainTypeAdaptorFactory<Akonadi2::Domain::Event, Akonadi2::Domain::Buffer::Event, DummyCalendar::DummyEvent> > mFactory; |
50 | }; | 50 | }; |
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 18083cb..f510cd5 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -387,7 +387,14 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli | |||
387 | } | 387 | } |
388 | } | 388 | } |
389 | //TODO find items to remove | 389 | //TODO find items to remove |
390 | qDebug() << "sync complete"; | ||
391 | f.setFinished(); | ||
392 | }); | ||
393 | } | ||
390 | 394 | ||
395 | Async::Job<void> DummyResource::processAllMessages() | ||
396 | { | ||
397 | return Async::start<void>([this](Async::Future<void> &f) { | ||
391 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | 398 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. |
392 | //TODO: report errors while processing sync? | 399 | //TODO: report errors while processing sync? |
393 | if (mSynchronizerQueue.isEmpty()) { | 400 | if (mSynchronizerQueue.isEmpty()) { |
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 767eba1..3b99d5e 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h | |||
@@ -35,6 +35,7 @@ class DummyResource : public Akonadi2::Resource | |||
35 | public: | 35 | public: |
36 | DummyResource(); | 36 | DummyResource(); |
37 | Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline); | 37 | Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline); |
38 | Async::Job<void> processAllMessages(); | ||
38 | void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); | 39 | void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); |
39 | void configurePipeline(Akonadi2::Pipeline *pipeline); | 40 | void configurePipeline(Akonadi2::Pipeline *pipeline); |
40 | int error() const; | 41 | int error() const; |
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index a84623d..6098856 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -28,6 +28,7 @@ | |||
28 | #include "common/commandcompletion_generated.h" | 28 | #include "common/commandcompletion_generated.h" |
29 | #include "common/handshake_generated.h" | 29 | #include "common/handshake_generated.h" |
30 | #include "common/revisionupdate_generated.h" | 30 | #include "common/revisionupdate_generated.h" |
31 | #include "common/synchronize_generated.h" | ||
31 | 32 | ||
32 | #include <QLocalSocket> | 33 | #include <QLocalSocket> |
33 | #include <QTimer> | 34 | #include <QTimer> |
@@ -189,18 +190,38 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
189 | break; | 190 | break; |
190 | } | 191 | } |
191 | case Akonadi2::Commands::SynchronizeCommand: { | 192 | case Akonadi2::Commands::SynchronizeCommand: { |
192 | log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); | 193 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); |
193 | loadResource(); | 194 | if (Akonadi2::VerifySynchronizeBuffer(verifier)) { |
194 | if (m_resource) { | 195 | auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData()); |
195 | qDebug() << "synchronizing"; | 196 | log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); |
196 | m_resource->synchronizeWithSource(m_pipeline).then<void>([callback](Async::Future<void> &f){ | 197 | loadResource(); |
197 | //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result | 198 | if (!m_resource) { |
198 | callback(); | 199 | qWarning() << "No resource loaded"; |
199 | f.setFinished(); | 200 | break; |
200 | }).exec(); | 201 | } |
202 | //TODO a more elegant composition of jobs should be possible | ||
203 | if (buffer->sourceSync()) { | ||
204 | bool localSync = buffer->localSync(); | ||
205 | m_resource->synchronizeWithSource(m_pipeline).then<void>([callback, localSync, this](Async::Future<void> &f){ | ||
206 | if (localSync) { | ||
207 | m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){ | ||
208 | callback(); | ||
209 | f.setFinished(); | ||
210 | }).exec(); | ||
211 | } else { | ||
212 | callback(); | ||
213 | f.setFinished(); | ||
214 | } | ||
215 | }).exec(); | ||
216 | } else if (buffer->localSync()) { | ||
217 | m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){ | ||
218 | callback(); | ||
219 | f.setFinished(); | ||
220 | }).exec(); | ||
221 | } | ||
201 | return; | 222 | return; |
202 | } else { | 223 | } else { |
203 | qWarning() << "No resource loaded"; | 224 | qWarning() << "received invalid command"; |
204 | } | 225 | } |
205 | break; | 226 | break; |
206 | } | 227 | } |
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 4e9d0a5..5508452 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -108,12 +108,10 @@ private Q_SLOTS: | |||
108 | event.setProperty("summary", "summaryValue"); | 108 | event.setProperty("summary", "summaryValue"); |
109 | Akonadi2::Store::create<Akonadi2::Domain::Event>(event, "org.kde.dummy"); | 109 | Akonadi2::Store::create<Akonadi2::Domain::Event>(event, "org.kde.dummy"); |
110 | 110 | ||
111 | //TODO required to ensure all messages have been processed. The query should ensure this. | ||
112 | QTest::qWait(300); | ||
113 | |||
114 | Akonadi2::Query query; | 111 | Akonadi2::Query query; |
115 | query.resources << "org.kde.dummy"; | 112 | query.resources << "org.kde.dummy"; |
116 | query.syncOnDemand = false; | 113 | query.syncOnDemand = false; |
114 | query.processAll = false; | ||
117 | 115 | ||
118 | query.propertyFilter.insert("uid", "testuid"); | 116 | query.propertyFilter.insert("uid", "testuid"); |
119 | async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); | 117 | async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); |
@@ -141,6 +139,8 @@ private Q_SLOTS: | |||
141 | { | 139 | { |
142 | Akonadi2::Query query; | 140 | Akonadi2::Query query; |
143 | query.resources << "org.kde.dummy"; | 141 | query.resources << "org.kde.dummy"; |
142 | query.syncOnDemand = true; | ||
143 | query.processAll = true; | ||
144 | 144 | ||
145 | async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); | 145 | async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); |
146 | result.exec(); | 146 | result.exec(); |