summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-25 11:23:08 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-25 11:23:08 +0100
commit3fc8ce958fc244e64a3a3a92f3b1440aae04133b (patch)
tree4ba2b3ca3ee6a17e7f3e7ce67d6ca934626cad7a
parent9b744da32e64d8a6cd342faba8fc3232884d60f2 (diff)
downloadsink-3fc8ce958fc244e64a3a3a92f3b1440aae04133b.tar.gz
sink-3fc8ce958fc244e64a3a3a92f3b1440aae04133b.zip
A way to ensure all messages have been processed.
As queries become reactive this should become less important. We can then just wait until all results become available. For tests it is in either case useful though.
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/clientapi.h3
-rw-r--r--common/commands/synchronize.fbs8
-rw-r--r--common/resource.cpp5
-rw-r--r--common/resource.h1
-rw-r--r--common/resourceaccess.cpp8
-rw-r--r--common/resourceaccess.h2
-rw-r--r--dummyresource/facade.cpp8
-rw-r--r--dummyresource/facade.h2
-rw-r--r--dummyresource/resourcefactory.cpp7
-rw-r--r--dummyresource/resourcefactory.h1
-rw-r--r--synchronizer/listener.cpp41
-rw-r--r--tests/dummyresourcetest.cpp6
13 files changed, 71 insertions, 22 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 3d3a2b7..18bcad0 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -7,6 +7,7 @@ generate_flatbuffers(
7 commands/handshake 7 commands/handshake
8 commands/modifyentity 8 commands/modifyentity
9 commands/revisionupdate 9 commands/revisionupdate
10 commands/synchronize
10 domain/event 11 domain/event
11 entity 12 entity
12 metadata 13 metadata
diff --git a/common/clientapi.h b/common/clientapi.h
index 659ae91..55fbed1 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -297,7 +297,7 @@ using namespace async;
297class Query 297class Query
298{ 298{
299public: 299public:
300 Query() : syncOnDemand(true) {} 300 Query() : syncOnDemand(true), processAll(false) {}
301 //Could also be a propertyFilter 301 //Could also be a propertyFilter
302 QStringList resources; 302 QStringList resources;
303 //Could also be a propertyFilter 303 //Could also be a propertyFilter
@@ -307,6 +307,7 @@ public:
307 //Properties to retrieve 307 //Properties to retrieve
308 QSet<QString> requestedProperties; 308 QSet<QString> requestedProperties;
309 bool syncOnDemand; 309 bool syncOnDemand;
310 bool processAll;
310}; 311};
311 312
312 313
diff --git a/common/commands/synchronize.fbs b/common/commands/synchronize.fbs
new file mode 100644
index 0000000..d2d0364
--- /dev/null
+++ b/common/commands/synchronize.fbs
@@ -0,0 +1,8 @@
1namespace Akonadi2;
2
3table Synchronize {
4 sourceSync: bool; //Synchronize with source
5 localSync: bool; //Ensure all queues are processed so the local state is up-to date.
6}
7
8root_type Synchronize;
diff --git a/common/resource.cpp b/common/resource.cpp
index db08c4f..e158a40 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -60,6 +60,11 @@ Async::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline)
60 }); 60 });
61} 61}
62 62
63Async::Job<void> Resource::processAllMessages()
64{
65 return Async::null<void>();
66}
67
63class ResourceFactory::Private 68class ResourceFactory::Private
64{ 69{
65public: 70public:
diff --git a/common/resource.h b/common/resource.h
index 52a28a6..bcce229 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -35,6 +35,7 @@ public:
35 35
36 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); 36 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline);
37 virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); 37 virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline);
38 virtual Async::Job<void> processAllMessages();
38 39
39 virtual void configurePipeline(Pipeline *pipeline); 40 virtual void configurePipeline(Pipeline *pipeline);
40 41
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 73a01ca..5d067c5 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -25,6 +25,7 @@
25#include "common/commandcompletion_generated.h" 25#include "common/commandcompletion_generated.h"
26#include "common/handshake_generated.h" 26#include "common/handshake_generated.h"
27#include "common/revisionupdate_generated.h" 27#include "common/revisionupdate_generated.h"
28#include "common/synchronize_generated.h"
28 29
29#include <QCoreApplication> 30#include <QCoreApplication>
30#include <QDebug> 31#include <QDebug>
@@ -186,9 +187,12 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu
186 }); 187 });
187} 188}
188 189
189Async::Job<void> ResourceAccess::synchronizeResource() 190Async::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync)
190{ 191{
191 return sendCommand(Commands::SynchronizeCommand); 192 auto command = Akonadi2::CreateSynchronize(d->fbb, sourceSync, localSync);
193 Akonadi2::FinishSynchronizeBuffer(d->fbb, command);
194 return sendCommand(Commands::SynchronizeCommand, d->fbb);
195 d->fbb.Clear();
192} 196}
193 197
194void ResourceAccess::open() 198void ResourceAccess::open()
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index a9e8c47..fbdd992 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -43,7 +43,7 @@ public:
43 43
44 Async::Job<void> sendCommand(int commandId); 44 Async::Job<void> sendCommand(int commandId);
45 Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); 45 Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb);
46 Async::Job<void> synchronizeResource(); 46 Async::Job<void> synchronizeResource(bool remoteSync, bool localSync);
47 47
48public Q_SLOTS: 48public Q_SLOTS:
49 void open(); 49 void open();
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp
index 1aaec68..fcce871 100644
--- a/dummyresource/facade.cpp
+++ b/dummyresource/facade.cpp
@@ -115,16 +115,16 @@ static std::function<bool(const std::string &key, DummyEvent const *buffer, Akon
115 return preparedQuery; 115 return preparedQuery;
116} 116}
117 117
118Async::Job<void> DummyResourceFacade::synchronizeResource(bool sync) 118Async::Job<void> DummyResourceFacade::synchronizeResource(bool sync, bool processAll)
119{ 119{
120 //TODO check if a sync is necessary 120 //TODO check if a sync is necessary
121 //TODO Only sync what was requested 121 //TODO Only sync what was requested
122 //TODO timeout 122 //TODO timeout
123 123
124 if (sync) { 124 if (sync || processAll) {
125 return Async::start<void>([=](Async::Future<void> &future) { 125 return Async::start<void>([=](Async::Future<void> &future) {
126 mResourceAccess->open(); 126 mResourceAccess->open();
127 mResourceAccess->synchronizeResource().then<void>([&future](Async::Future<void> &f) { 127 mResourceAccess->synchronizeResource(sync, processAll).then<void>([&future](Async::Future<void> &f) {
128 future.setFinished(); 128 future.setFinished();
129 f.setFinished(); 129 f.setFinished();
130 }).exec(); 130 }).exec();
@@ -195,7 +195,7 @@ void DummyResourceFacade::readValue(QSharedPointer<Akonadi2::Storage> storage, c
195 195
196Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback) 196Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback)
197{ 197{
198 return synchronizeResource(query.syncOnDemand).then<void>([=](Async::Future<void> &future) { 198 return synchronizeResource(query.syncOnDemand, query.processAll).then<void>([=](Async::Future<void> &future) {
199 //Now that the sync is complete we can execute the query 199 //Now that the sync is complete we can execute the query
200 const auto preparedQuery = prepareQuery(query); 200 const auto preparedQuery = prepareQuery(query);
201 201
diff --git a/dummyresource/facade.h b/dummyresource/facade.h
index c9c8047..1f69161 100644
--- a/dummyresource/facade.h
+++ b/dummyresource/facade.h
@@ -44,7 +44,7 @@ public:
44 44
45private: 45private:
46 void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::Domain::Buffer::Event const *local)>); 46 void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::Domain::Buffer::Event const *local)>);
47 Async::Job<void> synchronizeResource(bool sync); 47 Async::Job<void> synchronizeResource(bool sync, bool processAll);
48 QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess; 48 QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess;
49 QSharedPointer<DomainTypeAdaptorFactory<Akonadi2::Domain::Event, Akonadi2::Domain::Buffer::Event, DummyCalendar::DummyEvent> > mFactory; 49 QSharedPointer<DomainTypeAdaptorFactory<Akonadi2::Domain::Event, Akonadi2::Domain::Buffer::Event, DummyCalendar::DummyEvent> > mFactory;
50}; 50};
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index 18083cb..f510cd5 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -387,7 +387,14 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
387 } 387 }
388 } 388 }
389 //TODO find items to remove 389 //TODO find items to remove
390 qDebug() << "sync complete";
391 f.setFinished();
392 });
393}
390 394
395Async::Job<void> DummyResource::processAllMessages()
396{
397 return Async::start<void>([this](Async::Future<void> &f) {
391 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. 398 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
392 //TODO: report errors while processing sync? 399 //TODO: report errors while processing sync?
393 if (mSynchronizerQueue.isEmpty()) { 400 if (mSynchronizerQueue.isEmpty()) {
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h
index 767eba1..3b99d5e 100644
--- a/dummyresource/resourcefactory.h
+++ b/dummyresource/resourcefactory.h
@@ -35,6 +35,7 @@ class DummyResource : public Akonadi2::Resource
35public: 35public:
36 DummyResource(); 36 DummyResource();
37 Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline); 37 Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline);
38 Async::Job<void> processAllMessages();
38 void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); 39 void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline);
39 void configurePipeline(Akonadi2::Pipeline *pipeline); 40 void configurePipeline(Akonadi2::Pipeline *pipeline);
40 int error() const; 41 int error() const;
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index a84623d..6098856 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -28,6 +28,7 @@
28#include "common/commandcompletion_generated.h" 28#include "common/commandcompletion_generated.h"
29#include "common/handshake_generated.h" 29#include "common/handshake_generated.h"
30#include "common/revisionupdate_generated.h" 30#include "common/revisionupdate_generated.h"
31#include "common/synchronize_generated.h"
31 32
32#include <QLocalSocket> 33#include <QLocalSocket>
33#include <QTimer> 34#include <QTimer>
@@ -189,18 +190,38 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
189 break; 190 break;
190 } 191 }
191 case Akonadi2::Commands::SynchronizeCommand: { 192 case Akonadi2::Commands::SynchronizeCommand: {
192 log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); 193 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size);
193 loadResource(); 194 if (Akonadi2::VerifySynchronizeBuffer(verifier)) {
194 if (m_resource) { 195 auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData());
195 qDebug() << "synchronizing"; 196 log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name));
196 m_resource->synchronizeWithSource(m_pipeline).then<void>([callback](Async::Future<void> &f){ 197 loadResource();
197 //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result 198 if (!m_resource) {
198 callback(); 199 qWarning() << "No resource loaded";
199 f.setFinished(); 200 break;
200 }).exec(); 201 }
202 //TODO a more elegant composition of jobs should be possible
203 if (buffer->sourceSync()) {
204 bool localSync = buffer->localSync();
205 m_resource->synchronizeWithSource(m_pipeline).then<void>([callback, localSync, this](Async::Future<void> &f){
206 if (localSync) {
207 m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){
208 callback();
209 f.setFinished();
210 }).exec();
211 } else {
212 callback();
213 f.setFinished();
214 }
215 }).exec();
216 } else if (buffer->localSync()) {
217 m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){
218 callback();
219 f.setFinished();
220 }).exec();
221 }
201 return; 222 return;
202 } else { 223 } else {
203 qWarning() << "No resource loaded"; 224 qWarning() << "received invalid command";
204 } 225 }
205 break; 226 break;
206 } 227 }
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp
index 4e9d0a5..5508452 100644
--- a/tests/dummyresourcetest.cpp
+++ b/tests/dummyresourcetest.cpp
@@ -108,12 +108,10 @@ private Q_SLOTS:
108 event.setProperty("summary", "summaryValue"); 108 event.setProperty("summary", "summaryValue");
109 Akonadi2::Store::create<Akonadi2::Domain::Event>(event, "org.kde.dummy"); 109 Akonadi2::Store::create<Akonadi2::Domain::Event>(event, "org.kde.dummy");
110 110
111 //TODO required to ensure all messages have been processed. The query should ensure this.
112 QTest::qWait(300);
113
114 Akonadi2::Query query; 111 Akonadi2::Query query;
115 query.resources << "org.kde.dummy"; 112 query.resources << "org.kde.dummy";
116 query.syncOnDemand = false; 113 query.syncOnDemand = false;
114 query.processAll = false;
117 115
118 query.propertyFilter.insert("uid", "testuid"); 116 query.propertyFilter.insert("uid", "testuid");
119 async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); 117 async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query));
@@ -141,6 +139,8 @@ private Q_SLOTS:
141 { 139 {
142 Akonadi2::Query query; 140 Akonadi2::Query query;
143 query.resources << "org.kde.dummy"; 141 query.resources << "org.kde.dummy";
142 query.syncOnDemand = true;
143 query.processAll = true;
144 144
145 async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); 145 async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query));
146 result.exec(); 146 result.exec();