summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/clientapi.h23
-rw-r--r--common/resourceaccess.cpp63
-rw-r--r--common/resourceaccess.h5
-rw-r--r--common/test/clientapitest.cpp6
-rw-r--r--dummyresource/facade.cpp14
-rw-r--r--dummyresource/facade.h6
-rw-r--r--synchronizer/listener.cpp5
7 files changed, 82 insertions, 40 deletions
diff --git a/common/clientapi.h b/common/clientapi.h
index d2b1c9c..2f1c127 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -30,6 +30,7 @@
30#include <QtConcurrent/QtConcurrentRun> 30#include <QtConcurrent/QtConcurrentRun>
31#include <functional> 31#include <functional>
32#include "threadboundary.h" 32#include "threadboundary.h"
33#include "async/src/async.h"
33 34
34namespace async { 35namespace async {
35 //This should abstract if we execute from eventloop or in thread. 36 //This should abstract if we execute from eventloop or in thread.
@@ -185,6 +186,11 @@ public:
185 186
186class MemoryBufferAdaptor : public BufferAdaptor { 187class MemoryBufferAdaptor : public BufferAdaptor {
187public: 188public:
189 MemoryBufferAdaptor()
190 : BufferAdaptor()
191 {
192 }
193
188 MemoryBufferAdaptor(const BufferAdaptor &buffer) 194 MemoryBufferAdaptor(const BufferAdaptor &buffer)
189 : BufferAdaptor() 195 : BufferAdaptor()
190 { 196 {
@@ -208,6 +214,11 @@ private:
208 */ 214 */
209class AkonadiDomainType { 215class AkonadiDomainType {
210public: 216public:
217 AkonadiDomainType()
218 :mAdaptor(new MemoryBufferAdaptor())
219 {
220
221 }
211 AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision, const QSharedPointer<BufferAdaptor> &adaptor) 222 AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision, const QSharedPointer<BufferAdaptor> &adaptor)
212 : mAdaptor(adaptor), 223 : mAdaptor(adaptor),
213 mResourceName(resourceName), 224 mResourceName(resourceName),
@@ -310,9 +321,9 @@ class StoreFacade {
310public: 321public:
311 virtual ~StoreFacade(){}; 322 virtual ~StoreFacade(){};
312 QString type() const { return Domain::getTypeName<DomainType>(); } 323 QString type() const { return Domain::getTypeName<DomainType>(); }
313 virtual void create(const DomainType &domainObject) = 0; 324 virtual Async::Job<void> create(const DomainType &domainObject) = 0;
314 virtual void modify(const DomainType &domainObject) = 0; 325 virtual Async::Job<void> modify(const DomainType &domainObject) = 0;
315 virtual void remove(const DomainType &domainObject) = 0; 326 virtual Async::Job<void> remove(const DomainType &domainObject) = 0;
316 virtual void load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) = 0; 327 virtual void load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) = 0;
317}; 328};
318 329
@@ -440,11 +451,15 @@ public:
440 /** 451 /**
441 * Create a new entity. 452 * Create a new entity.
442 */ 453 */
454 //TODO return job that tracks progress until resource has stored the message in it's queue?
443 template <class DomainType> 455 template <class DomainType>
444 static void create(const DomainType &domainObject, const QString &resourceIdentifier) { 456 static void create(const DomainType &domainObject, const QString &resourceIdentifier) {
445 //Potentially move to separate thread as well 457 //Potentially move to separate thread as well
446 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceIdentifier); 458 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceIdentifier);
447 facade.create(domainObject); 459 auto job = facade->create(domainObject);
460 auto future = job.exec();
461 future.waitForFinished();
462 //TODO return job?
448 } 463 }
449 464
450 /** 465 /**
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 9fb0d4c..7b13101 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -133,41 +133,62 @@ void ResourceAccess::registerCallback(uint messageId, const std::function<void()
133 d->resultHandler.insert(messageId, callback); 133 d->resultHandler.insert(messageId, callback);
134} 134}
135 135
136void ResourceAccess::sendCommand(int commandId, const std::function<void()> &callback) 136Async::Job<void> ResourceAccess::sendCommand(int commandId)
137{ 137{
138 if (isReady()) { 138 return Async::start<void>([this, commandId](Async::Future<void> &f) {
139 log(QString("Sending command %1").arg(commandId)); 139 if (isReady()) {
140 d->messageId++; 140 log(QString("Sending command %1").arg(commandId));
141 d->messageId++;
142 registerCallback(d->messageId, [&f]() { f.setFinished(); });
143 Commands::write(d->socket, d->messageId, commandId);
144 } else {
145 d->commandQueue << new QueuedCommand(commandId, [&f]() { f.setFinished(); });
146 }
147 });
148}
149
150struct JobFinisher {
151 bool finished;
152 std::function<void()> callback;
153
154 JobFinisher() : finished(false) {}
155
156 void setFinished() {
157 finished = true;
141 if (callback) { 158 if (callback) {
142 registerCallback(d->messageId, callback); 159 callback();
143 } 160 }
144 Commands::write(d->socket, d->messageId, commandId);
145 } else {
146 d->commandQueue << new QueuedCommand(commandId, callback);
147 } 161 }
148} 162};
149 163
150void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback) 164Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb)
151{ 165{
166 auto finisher = QSharedPointer<JobFinisher>::create();
167 auto callback = [finisher] () {
168 finisher->setFinished();
169 };
152 if (isReady()) { 170 if (isReady()) {
153 log(QString("Sending command %1").arg(commandId));
154 d->messageId++; 171 d->messageId++;
155 if (callback) { 172 log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId));
156 registerCallback(d->messageId, callback); 173 registerCallback(d->messageId, callback);
157 }
158 Commands::write(d->socket, d->messageId, commandId, fbb); 174 Commands::write(d->socket, d->messageId, commandId, fbb);
159 } else { 175 } else {
160 d->commandQueue << new QueuedCommand(commandId, fbb, callback); 176 d->commandQueue << new QueuedCommand(commandId, fbb, callback);
161 } 177 }
178 return Async::start<void>([this, finisher](Async::Future<void> &f) {
179 if (finisher->finished) {
180 f.setFinished();
181 } else {
182 finisher->callback = [&f]() {
183 f.setFinished();
184 };
185 }
186 });
162} 187}
163 188
164Async::Job<void> ResourceAccess::synchronizeResource() 189Async::Job<void> ResourceAccess::synchronizeResource()
165{ 190{
166 return Async::start<void>([this](Async::Future<void> &f) { 191 return sendCommand(Commands::SynchronizeCommand);
167 sendCommand(Commands::SynchronizeCommand, [&f]() {
168 f.setFinished();
169 });
170 });
171} 192}
172 193
173void ResourceAccess::open() 194void ResourceAccess::open()
@@ -214,7 +235,7 @@ void ResourceAccess::connected()
214 log(QString("We have %1 queued commands").arg(d->commandQueue.size())); 235 log(QString("We have %1 queued commands").arg(d->commandQueue.size()));
215 for (QueuedCommand *command: d->commandQueue) { 236 for (QueuedCommand *command: d->commandQueue) {
216 d->messageId++; 237 d->messageId++;
217 log(QString("Sending command %1").arg(command->commandId)); 238 log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId));
218 if (command->callback) { 239 if (command->callback) {
219 registerCallback(d->messageId, command->callback); 240 registerCallback(d->messageId, command->callback);
220 } 241 }
@@ -294,7 +315,7 @@ bool ResourceAccess::processMessageBuffer()
294 } 315 }
295 case Commands::CommandCompletion: { 316 case Commands::CommandCompletion: {
296 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 317 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
297 log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); 318 log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"));
298 //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc 319 //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc
299 320
300 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 321 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 0a333f6..a9e8c47 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -41,9 +41,8 @@ public:
41 QString resourceName() const; 41 QString resourceName() const;
42 bool isReady() const; 42 bool isReady() const;
43 43
44 //TODO use jobs 44 Async::Job<void> sendCommand(int commandId);
45 void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>()); 45 Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb);
46 void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback = std::function<void()>());
47 Async::Job<void> synchronizeResource(); 46 Async::Job<void> synchronizeResource();
48 47
49public Q_SLOTS: 48public Q_SLOTS:
diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp
index 2ba0dcf..16616a3 100644
--- a/common/test/clientapitest.cpp
+++ b/common/test/clientapitest.cpp
@@ -8,9 +8,9 @@ class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::Domain::Event
8{ 8{
9public: 9public:
10 ~DummyResourceFacade(){}; 10 ~DummyResourceFacade(){};
11 virtual void create(const Akonadi2::Domain::Event &domainObject){}; 11 virtual Async::Job<void> create(const Akonadi2::Domain::Event &domainObject){ return Async::null<void>(); };
12 virtual void modify(const Akonadi2::Domain::Event &domainObject){}; 12 virtual Async::Job<void> modify(const Akonadi2::Domain::Event &domainObject){ return Async::null<void>(); };
13 virtual void remove(const Akonadi2::Domain::Event &domainObject){}; 13 virtual Async::Job<void> remove(const Akonadi2::Domain::Event &domainObject){ return Async::null<void>(); };
14 virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) 14 virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, const std::function<void()> &completeCallback)
15 { 15 {
16 qDebug() << "load called"; 16 qDebug() << "load called";
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp
index 668fbbf..13c174b 100644
--- a/dummyresource/facade.cpp
+++ b/dummyresource/facade.cpp
@@ -46,7 +46,7 @@ DummyResourceFacade::~DummyResourceFacade()
46{ 46{
47} 47}
48 48
49void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject) 49Async::Job<void> DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject)
50{ 50{
51 //Create message buffer and send to resource 51 //Create message buffer and send to resource
52 flatbuffers::FlatBufferBuilder eventFbb; 52 flatbuffers::FlatBufferBuilder eventFbb;
@@ -64,24 +64,28 @@ void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject)
64 Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), 0, 0); 64 Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), 0, 0);
65 65
66 flatbuffers::FlatBufferBuilder fbb; 66 flatbuffers::FlatBufferBuilder fbb;
67 auto type = fbb.CreateString(Akonadi2::Domain::getTypeName<Akonadi2::Domain::Event>().toStdString().data()); 67 //This is the resource type and not the domain type
68 auto type = fbb.CreateString("event");
68 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); 69 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
69 Akonadi2::Commands::CreateEntityBuilder builder(fbb); 70 Akonadi2::Commands::CreateEntityBuilder builder(fbb);
70 builder.add_domainType(type); 71 builder.add_domainType(type);
71 builder.add_delta(delta); 72 builder.add_delta(delta);
72 auto location = builder.Finish(); 73 auto location = builder.Finish();
73 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); 74 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
74 mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); 75 mResourceAccess->open();
76 return mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb);
75} 77}
76 78
77void DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject) 79Async::Job<void> DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject)
78{ 80{
79 //Create message buffer and send to resource 81 //Create message buffer and send to resource
82 return Async::null<void>();
80} 83}
81 84
82void DummyResourceFacade::remove(const Akonadi2::Domain::Event &domainObject) 85Async::Job<void> DummyResourceFacade::remove(const Akonadi2::Domain::Event &domainObject)
83{ 86{
84 //Create message buffer and send to resource 87 //Create message buffer and send to resource
88 return Async::null<void>();
85} 89}
86 90
87static std::function<bool(const std::string &key, DummyEvent const *buffer)> prepareQuery(const Akonadi2::Query &query) 91static std::function<bool(const std::string &key, DummyEvent const *buffer)> prepareQuery(const Akonadi2::Query &query)
diff --git a/dummyresource/facade.h b/dummyresource/facade.h
index e01d254..9c8827a 100644
--- a/dummyresource/facade.h
+++ b/dummyresource/facade.h
@@ -37,9 +37,9 @@ class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::Domain::Event
37public: 37public:
38 DummyResourceFacade(); 38 DummyResourceFacade();
39 virtual ~DummyResourceFacade(); 39 virtual ~DummyResourceFacade();
40 virtual void create(const Akonadi2::Domain::Event &domainObject); 40 virtual Async::Job<void> create(const Akonadi2::Domain::Event &domainObject);
41 virtual void modify(const Akonadi2::Domain::Event &domainObject); 41 virtual Async::Job<void> modify(const Akonadi2::Domain::Event &domainObject);
42 virtual void remove(const Akonadi2::Domain::Event &domainObject); 42 virtual Async::Job<void> remove(const Akonadi2::Domain::Event &domainObject);
43 virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, const std::function<void()> &completeCallback); 43 virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, const std::function<void()> &completeCallback);
44 44
45private: 45private:
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index dc0d9dd..a84623d 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -209,7 +209,10 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
209 case Akonadi2::Commands::ModifyEntityCommand: 209 case Akonadi2::Commands::ModifyEntityCommand:
210 case Akonadi2::Commands::CreateEntityCommand: 210 case Akonadi2::Commands::CreateEntityCommand:
211 log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); 211 log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
212 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); 212 loadResource();
213 if (m_resource) {
214 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
215 }
213 break; 216 break;
214 default: 217 default:
215 if (commandId > Akonadi2::Commands::CustomCommand) { 218 if (commandId > Akonadi2::Commands::CustomCommand) {