summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/clientapi.h3
-rw-r--r--common/commands/synchronize.fbs8
-rw-r--r--common/resource.cpp5
-rw-r--r--common/resource.h1
-rw-r--r--common/resourceaccess.cpp8
-rw-r--r--common/resourceaccess.h2
-rw-r--r--dummyresource/facade.cpp8
-rw-r--r--dummyresource/facade.h2
-rw-r--r--dummyresource/resourcefactory.cpp7
-rw-r--r--dummyresource/resourcefactory.h1
-rw-r--r--synchronizer/listener.cpp41
-rw-r--r--tests/dummyresourcetest.cpp6
13 files changed, 71 insertions, 22 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 3d3a2b7..18bcad0 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -7,6 +7,7 @@ generate_flatbuffers(
7 commands/handshake 7 commands/handshake
8 commands/modifyentity 8 commands/modifyentity
9 commands/revisionupdate 9 commands/revisionupdate
10 commands/synchronize
10 domain/event 11 domain/event
11 entity 12 entity
12 metadata 13 metadata
diff --git a/common/clientapi.h b/common/clientapi.h
index 659ae91..55fbed1 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -297,7 +297,7 @@ using namespace async;
297class Query 297class Query
298{ 298{
299public: 299public:
300 Query() : syncOnDemand(true) {} 300 Query() : syncOnDemand(true), processAll(false) {}
301 //Could also be a propertyFilter 301 //Could also be a propertyFilter
302 QStringList resources; 302 QStringList resources;
303 //Could also be a propertyFilter 303 //Could also be a propertyFilter
@@ -307,6 +307,7 @@ public:
307 //Properties to retrieve 307 //Properties to retrieve
308 QSet<QString> requestedProperties; 308 QSet<QString> requestedProperties;
309 bool syncOnDemand; 309 bool syncOnDemand;
310 bool processAll;
310}; 311};
311 312
312 313
diff --git a/common/commands/synchronize.fbs b/common/commands/synchronize.fbs
new file mode 100644
index 0000000..d2d0364
--- /dev/null
+++ b/common/commands/synchronize.fbs
@@ -0,0 +1,8 @@
1namespace Akonadi2;
2
3table Synchronize {
4 sourceSync: bool; //Synchronize with source
5 localSync: bool; //Ensure all queues are processed so the local state is up-to date.
6}
7
8root_type Synchronize;
diff --git a/common/resource.cpp b/common/resource.cpp
index db08c4f..e158a40 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -60,6 +60,11 @@ Async::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline)
60 }); 60 });
61} 61}
62 62
63Async::Job<void> Resource::processAllMessages()
64{
65 return Async::null<void>();
66}
67
63class ResourceFactory::Private 68class ResourceFactory::Private
64{ 69{
65public: 70public:
diff --git a/common/resource.h b/common/resource.h
index 52a28a6..bcce229 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -35,6 +35,7 @@ public:
35 35
36 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); 36 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline);
37 virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); 37 virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline);
38 virtual Async::Job<void> processAllMessages();
38 39
39 virtual void configurePipeline(Pipeline *pipeline); 40 virtual void configurePipeline(Pipeline *pipeline);
40 41
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 73a01ca..5d067c5 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -25,6 +25,7 @@
25#include "common/commandcompletion_generated.h" 25#include "common/commandcompletion_generated.h"
26#include "common/handshake_generated.h" 26#include "common/handshake_generated.h"
27#include "common/revisionupdate_generated.h" 27#include "common/revisionupdate_generated.h"
28#include "common/synchronize_generated.h"
28 29
29#include <QCoreApplication> 30#include <QCoreApplication>
30#include <QDebug> 31#include <QDebug>
@@ -186,9 +187,12 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu
186 }); 187 });
187} 188}
188 189
189Async::Job<void> ResourceAccess::synchronizeResource() 190Async::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync)
190{ 191{
191 return sendCommand(Commands::SynchronizeCommand); 192 auto command = Akonadi2::CreateSynchronize(d->fbb, sourceSync, localSync);
193 Akonadi2::FinishSynchronizeBuffer(d->fbb, command);
194 return sendCommand(Commands::SynchronizeCommand, d->fbb);
195 d->fbb.Clear();
192} 196}
193 197
194void ResourceAccess::open() 198void ResourceAccess::open()
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index a9e8c47..fbdd992 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -43,7 +43,7 @@ public:
43 43
44 Async::Job<void> sendCommand(int commandId); 44 Async::Job<void> sendCommand(int commandId);
45 Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); 45 Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb);
46 Async::Job<void> synchronizeResource(); 46 Async::Job<void> synchronizeResource(bool remoteSync, bool localSync);
47 47
48public Q_SLOTS: 48public Q_SLOTS:
49 void open(); 49 void open();
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp
index 1aaec68..fcce871 100644
--- a/dummyresource/facade.cpp
+++ b/dummyresource/facade.cpp
@@ -115,16 +115,16 @@ static std::function<bool(const std::string &key, DummyEvent const *buffer, Akon
115 return preparedQuery; 115 return preparedQuery;
116} 116}
117 117
118Async::Job<void> DummyResourceFacade::synchronizeResource(bool sync) 118Async::Job<void> DummyResourceFacade::synchronizeResource(bool sync, bool processAll)
119{ 119{
120 //TODO check if a sync is necessary 120 //TODO check if a sync is necessary
121 //TODO Only sync what was requested 121 //TODO Only sync what was requested
122 //TODO timeout 122 //TODO timeout
123 123
124 if (sync) { 124 if (sync || processAll) {
125 return Async::start<void>([=](Async::Future<void> &future) { 125 return Async::start<void>([=](Async::Future<void> &future) {
126 mResourceAccess->open(); 126 mResourceAccess->open();
127 mResourceAccess->synchronizeResource().then<void>([&future](Async::Future<void> &f) { 127 mResourceAccess->synchronizeResource(sync, processAll).then<void>([&future](Async::Future<void> &f) {
128 future.setFinished(); 128 future.setFinished();
129 f.setFinished(); 129 f.setFinished();
130 }).exec(); 130 }).exec();
@@ -195,7 +195,7 @@ void DummyResourceFacade::readValue(QSharedPointer<Akonadi2::Storage> storage, c
195 195
196Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback) 196Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback)
197{ 197{
198 return synchronizeResource(query.syncOnDemand).then<void>([=](Async::Future<void> &future) { 198 return synchronizeResource(query.syncOnDemand, query.processAll).then<void>([=](Async::Future<void> &future) {
199 //Now that the sync is complete we can execute the query 199 //Now that the sync is complete we can execute the query
200 const auto preparedQuery = prepareQuery(query); 200 const auto preparedQuery = prepareQuery(query);
201 201
diff --git a/dummyresource/facade.h b/dummyresource/facade.h
index c9c8047..1f69161 100644
--- a/dummyresource/facade.h
+++ b/dummyresource/facade.h
@@ -44,7 +44,7 @@ public:
44 44
45private: 45private:
46 void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::Domain::Buffer::Event const *local)>); 46 void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::Domain::Buffer::Event const *local)>);
47 Async::Job<void> synchronizeResource(bool sync); 47 Async::Job<void> synchronizeResource(bool sync, bool processAll);
48 QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess; 48 QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess;
49 QSharedPointer<DomainTypeAdaptorFactory<Akonadi2::Domain::Event, Akonadi2::Domain::Buffer::Event, DummyCalendar::DummyEvent> > mFactory; 49 QSharedPointer<DomainTypeAdaptorFactory<Akonadi2::Domain::Event, Akonadi2::Domain::Buffer::Event, DummyCalendar::DummyEvent> > mFactory;
50}; 50};
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index 18083cb..f510cd5 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -387,7 +387,14 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
387 } 387 }
388 } 388 }
389 //TODO find items to remove 389 //TODO find items to remove
390 qDebug() << "sync complete";
391 f.setFinished();
392 });
393}
390 394
395Async::Job<void> DummyResource::processAllMessages()
396{
397 return Async::start<void>([this](Async::Future<void> &f) {
391 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. 398 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
392 //TODO: report errors while processing sync? 399 //TODO: report errors while processing sync?
393 if (mSynchronizerQueue.isEmpty()) { 400 if (mSynchronizerQueue.isEmpty()) {
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h
index 767eba1..3b99d5e 100644
--- a/dummyresource/resourcefactory.h
+++ b/dummyresource/resourcefactory.h
@@ -35,6 +35,7 @@ class DummyResource : public Akonadi2::Resource
35public: 35public:
36 DummyResource(); 36 DummyResource();
37 Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline); 37 Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline);
38 Async::Job<void> processAllMessages();
38 void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); 39 void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline);
39 void configurePipeline(Akonadi2::Pipeline *pipeline); 40 void configurePipeline(Akonadi2::Pipeline *pipeline);
40 int error() const; 41 int error() const;
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index a84623d..6098856 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -28,6 +28,7 @@
28#include "common/commandcompletion_generated.h" 28#include "common/commandcompletion_generated.h"
29#include "common/handshake_generated.h" 29#include "common/handshake_generated.h"
30#include "common/revisionupdate_generated.h" 30#include "common/revisionupdate_generated.h"
31#include "common/synchronize_generated.h"
31 32
32#include <QLocalSocket> 33#include <QLocalSocket>
33#include <QTimer> 34#include <QTimer>
@@ -189,18 +190,38 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
189 break; 190 break;
190 } 191 }
191 case Akonadi2::Commands::SynchronizeCommand: { 192 case Akonadi2::Commands::SynchronizeCommand: {
192 log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); 193 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size);
193 loadResource(); 194 if (Akonadi2::VerifySynchronizeBuffer(verifier)) {
194 if (m_resource) { 195 auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData());
195 qDebug() << "synchronizing"; 196 log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name));
196 m_resource->synchronizeWithSource(m_pipeline).then<void>([callback](Async::Future<void> &f){ 197 loadResource();
197 //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result 198 if (!m_resource) {
198 callback(); 199 qWarning() << "No resource loaded";
199 f.setFinished(); 200 break;
200 }).exec(); 201 }
202 //TODO a more elegant composition of jobs should be possible
203 if (buffer->sourceSync()) {
204 bool localSync = buffer->localSync();
205 m_resource->synchronizeWithSource(m_pipeline).then<void>([callback, localSync, this](Async::Future<void> &f){
206 if (localSync) {
207 m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){
208 callback();
209 f.setFinished();
210 }).exec();
211 } else {
212 callback();
213 f.setFinished();
214 }
215 }).exec();
216 } else if (buffer->localSync()) {
217 m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){
218 callback();
219 f.setFinished();
220 }).exec();
221 }
201 return; 222 return;
202 } else { 223 } else {
203 qWarning() << "No resource loaded"; 224 qWarning() << "received invalid command";
204 } 225 }
205 break; 226 break;
206 } 227 }
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp
index 4e9d0a5..5508452 100644
--- a/tests/dummyresourcetest.cpp
+++ b/tests/dummyresourcetest.cpp
@@ -108,12 +108,10 @@ private Q_SLOTS:
108 event.setProperty("summary", "summaryValue"); 108 event.setProperty("summary", "summaryValue");
109 Akonadi2::Store::create<Akonadi2::Domain::Event>(event, "org.kde.dummy"); 109 Akonadi2::Store::create<Akonadi2::Domain::Event>(event, "org.kde.dummy");
110 110
111 //TODO required to ensure all messages have been processed. The query should ensure this.
112 QTest::qWait(300);
113
114 Akonadi2::Query query; 111 Akonadi2::Query query;
115 query.resources << "org.kde.dummy"; 112 query.resources << "org.kde.dummy";
116 query.syncOnDemand = false; 113 query.syncOnDemand = false;
114 query.processAll = false;
117 115
118 query.propertyFilter.insert("uid", "testuid"); 116 query.propertyFilter.insert("uid", "testuid");
119 async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); 117 async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query));
@@ -141,6 +139,8 @@ private Q_SLOTS:
141 { 139 {
142 Akonadi2::Query query; 140 Akonadi2::Query query;
143 query.resources << "org.kde.dummy"; 141 query.resources << "org.kde.dummy";
142 query.syncOnDemand = true;
143 query.processAll = true;
144 144
145 async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); 145 async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query));
146 result.exec(); 146 result.exec();