summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/clientapi.h23
-rw-r--r--common/resourceaccess.cpp63
-rw-r--r--common/resourceaccess.h5
-rw-r--r--common/test/clientapitest.cpp6
4 files changed, 66 insertions, 31 deletions
diff --git a/common/clientapi.h b/common/clientapi.h
index d2b1c9c..2f1c127 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -30,6 +30,7 @@
30#include <QtConcurrent/QtConcurrentRun> 30#include <QtConcurrent/QtConcurrentRun>
31#include <functional> 31#include <functional>
32#include "threadboundary.h" 32#include "threadboundary.h"
33#include "async/src/async.h"
33 34
34namespace async { 35namespace async {
35 //This should abstract if we execute from eventloop or in thread. 36 //This should abstract if we execute from eventloop or in thread.
@@ -185,6 +186,11 @@ public:
185 186
186class MemoryBufferAdaptor : public BufferAdaptor { 187class MemoryBufferAdaptor : public BufferAdaptor {
187public: 188public:
189 MemoryBufferAdaptor()
190 : BufferAdaptor()
191 {
192 }
193
188 MemoryBufferAdaptor(const BufferAdaptor &buffer) 194 MemoryBufferAdaptor(const BufferAdaptor &buffer)
189 : BufferAdaptor() 195 : BufferAdaptor()
190 { 196 {
@@ -208,6 +214,11 @@ private:
208 */ 214 */
209class AkonadiDomainType { 215class AkonadiDomainType {
210public: 216public:
217 AkonadiDomainType()
218 :mAdaptor(new MemoryBufferAdaptor())
219 {
220
221 }
211 AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision, const QSharedPointer<BufferAdaptor> &adaptor) 222 AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision, const QSharedPointer<BufferAdaptor> &adaptor)
212 : mAdaptor(adaptor), 223 : mAdaptor(adaptor),
213 mResourceName(resourceName), 224 mResourceName(resourceName),
@@ -310,9 +321,9 @@ class StoreFacade {
310public: 321public:
311 virtual ~StoreFacade(){}; 322 virtual ~StoreFacade(){};
312 QString type() const { return Domain::getTypeName<DomainType>(); } 323 QString type() const { return Domain::getTypeName<DomainType>(); }
313 virtual void create(const DomainType &domainObject) = 0; 324 virtual Async::Job<void> create(const DomainType &domainObject) = 0;
314 virtual void modify(const DomainType &domainObject) = 0; 325 virtual Async::Job<void> modify(const DomainType &domainObject) = 0;
315 virtual void remove(const DomainType &domainObject) = 0; 326 virtual Async::Job<void> remove(const DomainType &domainObject) = 0;
316 virtual void load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) = 0; 327 virtual void load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) = 0;
317}; 328};
318 329
@@ -440,11 +451,15 @@ public:
440 /** 451 /**
441 * Create a new entity. 452 * Create a new entity.
442 */ 453 */
454 //TODO return job that tracks progress until resource has stored the message in it's queue?
443 template <class DomainType> 455 template <class DomainType>
444 static void create(const DomainType &domainObject, const QString &resourceIdentifier) { 456 static void create(const DomainType &domainObject, const QString &resourceIdentifier) {
445 //Potentially move to separate thread as well 457 //Potentially move to separate thread as well
446 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceIdentifier); 458 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceIdentifier);
447 facade.create(domainObject); 459 auto job = facade->create(domainObject);
460 auto future = job.exec();
461 future.waitForFinished();
462 //TODO return job?
448 } 463 }
449 464
450 /** 465 /**
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 9fb0d4c..7b13101 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -133,41 +133,62 @@ void ResourceAccess::registerCallback(uint messageId, const std::function<void()
133 d->resultHandler.insert(messageId, callback); 133 d->resultHandler.insert(messageId, callback);
134} 134}
135 135
136void ResourceAccess::sendCommand(int commandId, const std::function<void()> &callback) 136Async::Job<void> ResourceAccess::sendCommand(int commandId)
137{ 137{
138 if (isReady()) { 138 return Async::start<void>([this, commandId](Async::Future<void> &f) {
139 log(QString("Sending command %1").arg(commandId)); 139 if (isReady()) {
140 d->messageId++; 140 log(QString("Sending command %1").arg(commandId));
141 d->messageId++;
142 registerCallback(d->messageId, [&f]() { f.setFinished(); });
143 Commands::write(d->socket, d->messageId, commandId);
144 } else {
145 d->commandQueue << new QueuedCommand(commandId, [&f]() { f.setFinished(); });
146 }
147 });
148}
149
150struct JobFinisher {
151 bool finished;
152 std::function<void()> callback;
153
154 JobFinisher() : finished(false) {}
155
156 void setFinished() {
157 finished = true;
141 if (callback) { 158 if (callback) {
142 registerCallback(d->messageId, callback); 159 callback();
143 } 160 }
144 Commands::write(d->socket, d->messageId, commandId);
145 } else {
146 d->commandQueue << new QueuedCommand(commandId, callback);
147 } 161 }
148} 162};
149 163
150void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback) 164Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb)
151{ 165{
166 auto finisher = QSharedPointer<JobFinisher>::create();
167 auto callback = [finisher] () {
168 finisher->setFinished();
169 };
152 if (isReady()) { 170 if (isReady()) {
153 log(QString("Sending command %1").arg(commandId));
154 d->messageId++; 171 d->messageId++;
155 if (callback) { 172 log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId));
156 registerCallback(d->messageId, callback); 173 registerCallback(d->messageId, callback);
157 }
158 Commands::write(d->socket, d->messageId, commandId, fbb); 174 Commands::write(d->socket, d->messageId, commandId, fbb);
159 } else { 175 } else {
160 d->commandQueue << new QueuedCommand(commandId, fbb, callback); 176 d->commandQueue << new QueuedCommand(commandId, fbb, callback);
161 } 177 }
178 return Async::start<void>([this, finisher](Async::Future<void> &f) {
179 if (finisher->finished) {
180 f.setFinished();
181 } else {
182 finisher->callback = [&f]() {
183 f.setFinished();
184 };
185 }
186 });
162} 187}
163 188
164Async::Job<void> ResourceAccess::synchronizeResource() 189Async::Job<void> ResourceAccess::synchronizeResource()
165{ 190{
166 return Async::start<void>([this](Async::Future<void> &f) { 191 return sendCommand(Commands::SynchronizeCommand);
167 sendCommand(Commands::SynchronizeCommand, [&f]() {
168 f.setFinished();
169 });
170 });
171} 192}
172 193
173void ResourceAccess::open() 194void ResourceAccess::open()
@@ -214,7 +235,7 @@ void ResourceAccess::connected()
214 log(QString("We have %1 queued commands").arg(d->commandQueue.size())); 235 log(QString("We have %1 queued commands").arg(d->commandQueue.size()));
215 for (QueuedCommand *command: d->commandQueue) { 236 for (QueuedCommand *command: d->commandQueue) {
216 d->messageId++; 237 d->messageId++;
217 log(QString("Sending command %1").arg(command->commandId)); 238 log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId));
218 if (command->callback) { 239 if (command->callback) {
219 registerCallback(d->messageId, command->callback); 240 registerCallback(d->messageId, command->callback);
220 } 241 }
@@ -294,7 +315,7 @@ bool ResourceAccess::processMessageBuffer()
294 } 315 }
295 case Commands::CommandCompletion: { 316 case Commands::CommandCompletion: {
296 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 317 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
297 log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); 318 log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"));
298 //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc 319 //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc
299 320
300 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 321 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 0a333f6..a9e8c47 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -41,9 +41,8 @@ public:
41 QString resourceName() const; 41 QString resourceName() const;
42 bool isReady() const; 42 bool isReady() const;
43 43
44 //TODO use jobs 44 Async::Job<void> sendCommand(int commandId);
45 void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>()); 45 Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb);
46 void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback = std::function<void()>());
47 Async::Job<void> synchronizeResource(); 46 Async::Job<void> synchronizeResource();
48 47
49public Q_SLOTS: 48public Q_SLOTS:
diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp
index 2ba0dcf..16616a3 100644
--- a/common/test/clientapitest.cpp
+++ b/common/test/clientapitest.cpp
@@ -8,9 +8,9 @@ class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::Domain::Event
8{ 8{
9public: 9public:
10 ~DummyResourceFacade(){}; 10 ~DummyResourceFacade(){};
11 virtual void create(const Akonadi2::Domain::Event &domainObject){}; 11 virtual Async::Job<void> create(const Akonadi2::Domain::Event &domainObject){ return Async::null<void>(); };
12 virtual void modify(const Akonadi2::Domain::Event &domainObject){}; 12 virtual Async::Job<void> modify(const Akonadi2::Domain::Event &domainObject){ return Async::null<void>(); };
13 virtual void remove(const Akonadi2::Domain::Event &domainObject){}; 13 virtual Async::Job<void> remove(const Akonadi2::Domain::Event &domainObject){ return Async::null<void>(); };
14 virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) 14 virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, const std::function<void()> &completeCallback)
15 { 15 {
16 qDebug() << "load called"; 16 qDebug() << "load called";