diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-01-18 10:51:34 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-01-18 10:51:34 +0100 |
commit | 47b4442c585a25b2e4b857f2d9e3ab371d942c19 (patch) | |
tree | 4167c6cb75e3ee8072452585f1b433fb637e3389 | |
parent | aef2ebc45a30d3c3b15b630648e8b37a551ce1ef (diff) | |
download | sink-47b4442c585a25b2e4b857f2d9e3ab371d942c19.tar.gz sink-47b4442c585a25b2e4b857f2d9e3ab371d942c19.zip |
Use jobs to track progress of write commands.
-rw-r--r-- | common/clientapi.h | 23 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 63 | ||||
-rw-r--r-- | common/resourceaccess.h | 5 | ||||
-rw-r--r-- | common/test/clientapitest.cpp | 6 | ||||
-rw-r--r-- | dummyresource/facade.cpp | 14 | ||||
-rw-r--r-- | dummyresource/facade.h | 6 | ||||
-rw-r--r-- | synchronizer/listener.cpp | 5 |
7 files changed, 82 insertions, 40 deletions
diff --git a/common/clientapi.h b/common/clientapi.h index d2b1c9c..2f1c127 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -30,6 +30,7 @@ | |||
30 | #include <QtConcurrent/QtConcurrentRun> | 30 | #include <QtConcurrent/QtConcurrentRun> |
31 | #include <functional> | 31 | #include <functional> |
32 | #include "threadboundary.h" | 32 | #include "threadboundary.h" |
33 | #include "async/src/async.h" | ||
33 | 34 | ||
34 | namespace async { | 35 | namespace async { |
35 | //This should abstract if we execute from eventloop or in thread. | 36 | //This should abstract if we execute from eventloop or in thread. |
@@ -185,6 +186,11 @@ public: | |||
185 | 186 | ||
186 | class MemoryBufferAdaptor : public BufferAdaptor { | 187 | class MemoryBufferAdaptor : public BufferAdaptor { |
187 | public: | 188 | public: |
189 | MemoryBufferAdaptor() | ||
190 | : BufferAdaptor() | ||
191 | { | ||
192 | } | ||
193 | |||
188 | MemoryBufferAdaptor(const BufferAdaptor &buffer) | 194 | MemoryBufferAdaptor(const BufferAdaptor &buffer) |
189 | : BufferAdaptor() | 195 | : BufferAdaptor() |
190 | { | 196 | { |
@@ -208,6 +214,11 @@ private: | |||
208 | */ | 214 | */ |
209 | class AkonadiDomainType { | 215 | class AkonadiDomainType { |
210 | public: | 216 | public: |
217 | AkonadiDomainType() | ||
218 | :mAdaptor(new MemoryBufferAdaptor()) | ||
219 | { | ||
220 | |||
221 | } | ||
211 | AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision, const QSharedPointer<BufferAdaptor> &adaptor) | 222 | AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision, const QSharedPointer<BufferAdaptor> &adaptor) |
212 | : mAdaptor(adaptor), | 223 | : mAdaptor(adaptor), |
213 | mResourceName(resourceName), | 224 | mResourceName(resourceName), |
@@ -310,9 +321,9 @@ class StoreFacade { | |||
310 | public: | 321 | public: |
311 | virtual ~StoreFacade(){}; | 322 | virtual ~StoreFacade(){}; |
312 | QString type() const { return Domain::getTypeName<DomainType>(); } | 323 | QString type() const { return Domain::getTypeName<DomainType>(); } |
313 | virtual void create(const DomainType &domainObject) = 0; | 324 | virtual Async::Job<void> create(const DomainType &domainObject) = 0; |
314 | virtual void modify(const DomainType &domainObject) = 0; | 325 | virtual Async::Job<void> modify(const DomainType &domainObject) = 0; |
315 | virtual void remove(const DomainType &domainObject) = 0; | 326 | virtual Async::Job<void> remove(const DomainType &domainObject) = 0; |
316 | virtual void load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) = 0; | 327 | virtual void load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) = 0; |
317 | }; | 328 | }; |
318 | 329 | ||
@@ -440,11 +451,15 @@ public: | |||
440 | /** | 451 | /** |
441 | * Create a new entity. | 452 | * Create a new entity. |
442 | */ | 453 | */ |
454 | //TODO return job that tracks progress until resource has stored the message in it's queue? | ||
443 | template <class DomainType> | 455 | template <class DomainType> |
444 | static void create(const DomainType &domainObject, const QString &resourceIdentifier) { | 456 | static void create(const DomainType &domainObject, const QString &resourceIdentifier) { |
445 | //Potentially move to separate thread as well | 457 | //Potentially move to separate thread as well |
446 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceIdentifier); | 458 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceIdentifier); |
447 | facade.create(domainObject); | 459 | auto job = facade->create(domainObject); |
460 | auto future = job.exec(); | ||
461 | future.waitForFinished(); | ||
462 | //TODO return job? | ||
448 | } | 463 | } |
449 | 464 | ||
450 | /** | 465 | /** |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 9fb0d4c..7b13101 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -133,41 +133,62 @@ void ResourceAccess::registerCallback(uint messageId, const std::function<void() | |||
133 | d->resultHandler.insert(messageId, callback); | 133 | d->resultHandler.insert(messageId, callback); |
134 | } | 134 | } |
135 | 135 | ||
136 | void ResourceAccess::sendCommand(int commandId, const std::function<void()> &callback) | 136 | Async::Job<void> ResourceAccess::sendCommand(int commandId) |
137 | { | 137 | { |
138 | if (isReady()) { | 138 | return Async::start<void>([this, commandId](Async::Future<void> &f) { |
139 | log(QString("Sending command %1").arg(commandId)); | 139 | if (isReady()) { |
140 | d->messageId++; | 140 | log(QString("Sending command %1").arg(commandId)); |
141 | d->messageId++; | ||
142 | registerCallback(d->messageId, [&f]() { f.setFinished(); }); | ||
143 | Commands::write(d->socket, d->messageId, commandId); | ||
144 | } else { | ||
145 | d->commandQueue << new QueuedCommand(commandId, [&f]() { f.setFinished(); }); | ||
146 | } | ||
147 | }); | ||
148 | } | ||
149 | |||
150 | struct JobFinisher { | ||
151 | bool finished; | ||
152 | std::function<void()> callback; | ||
153 | |||
154 | JobFinisher() : finished(false) {} | ||
155 | |||
156 | void setFinished() { | ||
157 | finished = true; | ||
141 | if (callback) { | 158 | if (callback) { |
142 | registerCallback(d->messageId, callback); | 159 | callback(); |
143 | } | 160 | } |
144 | Commands::write(d->socket, d->messageId, commandId); | ||
145 | } else { | ||
146 | d->commandQueue << new QueuedCommand(commandId, callback); | ||
147 | } | 161 | } |
148 | } | 162 | }; |
149 | 163 | ||
150 | void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback) | 164 | Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) |
151 | { | 165 | { |
166 | auto finisher = QSharedPointer<JobFinisher>::create(); | ||
167 | auto callback = [finisher] () { | ||
168 | finisher->setFinished(); | ||
169 | }; | ||
152 | if (isReady()) { | 170 | if (isReady()) { |
153 | log(QString("Sending command %1").arg(commandId)); | ||
154 | d->messageId++; | 171 | d->messageId++; |
155 | if (callback) { | 172 | log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); |
156 | registerCallback(d->messageId, callback); | 173 | registerCallback(d->messageId, callback); |
157 | } | ||
158 | Commands::write(d->socket, d->messageId, commandId, fbb); | 174 | Commands::write(d->socket, d->messageId, commandId, fbb); |
159 | } else { | 175 | } else { |
160 | d->commandQueue << new QueuedCommand(commandId, fbb, callback); | 176 | d->commandQueue << new QueuedCommand(commandId, fbb, callback); |
161 | } | 177 | } |
178 | return Async::start<void>([this, finisher](Async::Future<void> &f) { | ||
179 | if (finisher->finished) { | ||
180 | f.setFinished(); | ||
181 | } else { | ||
182 | finisher->callback = [&f]() { | ||
183 | f.setFinished(); | ||
184 | }; | ||
185 | } | ||
186 | }); | ||
162 | } | 187 | } |
163 | 188 | ||
164 | Async::Job<void> ResourceAccess::synchronizeResource() | 189 | Async::Job<void> ResourceAccess::synchronizeResource() |
165 | { | 190 | { |
166 | return Async::start<void>([this](Async::Future<void> &f) { | 191 | return sendCommand(Commands::SynchronizeCommand); |
167 | sendCommand(Commands::SynchronizeCommand, [&f]() { | ||
168 | f.setFinished(); | ||
169 | }); | ||
170 | }); | ||
171 | } | 192 | } |
172 | 193 | ||
173 | void ResourceAccess::open() | 194 | void ResourceAccess::open() |
@@ -214,7 +235,7 @@ void ResourceAccess::connected() | |||
214 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); | 235 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); |
215 | for (QueuedCommand *command: d->commandQueue) { | 236 | for (QueuedCommand *command: d->commandQueue) { |
216 | d->messageId++; | 237 | d->messageId++; |
217 | log(QString("Sending command %1").arg(command->commandId)); | 238 | log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId)); |
218 | if (command->callback) { | 239 | if (command->callback) { |
219 | registerCallback(d->messageId, command->callback); | 240 | registerCallback(d->messageId, command->callback); |
220 | } | 241 | } |
@@ -294,7 +315,7 @@ bool ResourceAccess::processMessageBuffer() | |||
294 | } | 315 | } |
295 | case Commands::CommandCompletion: { | 316 | case Commands::CommandCompletion: { |
296 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); | 317 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); |
297 | log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); | 318 | log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); |
298 | //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc | 319 | //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc |
299 | 320 | ||
300 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first | 321 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 0a333f6..a9e8c47 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -41,9 +41,8 @@ public: | |||
41 | QString resourceName() const; | 41 | QString resourceName() const; |
42 | bool isReady() const; | 42 | bool isReady() const; |
43 | 43 | ||
44 | //TODO use jobs | 44 | Async::Job<void> sendCommand(int commandId); |
45 | void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>()); | 45 | Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); |
46 | void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback = std::function<void()>()); | ||
47 | Async::Job<void> synchronizeResource(); | 46 | Async::Job<void> synchronizeResource(); |
48 | 47 | ||
49 | public Q_SLOTS: | 48 | public Q_SLOTS: |
diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp index 2ba0dcf..16616a3 100644 --- a/common/test/clientapitest.cpp +++ b/common/test/clientapitest.cpp | |||
@@ -8,9 +8,9 @@ class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::Domain::Event | |||
8 | { | 8 | { |
9 | public: | 9 | public: |
10 | ~DummyResourceFacade(){}; | 10 | ~DummyResourceFacade(){}; |
11 | virtual void create(const Akonadi2::Domain::Event &domainObject){}; | 11 | virtual Async::Job<void> create(const Akonadi2::Domain::Event &domainObject){ return Async::null<void>(); }; |
12 | virtual void modify(const Akonadi2::Domain::Event &domainObject){}; | 12 | virtual Async::Job<void> modify(const Akonadi2::Domain::Event &domainObject){ return Async::null<void>(); }; |
13 | virtual void remove(const Akonadi2::Domain::Event &domainObject){}; | 13 | virtual Async::Job<void> remove(const Akonadi2::Domain::Event &domainObject){ return Async::null<void>(); }; |
14 | virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) | 14 | virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) |
15 | { | 15 | { |
16 | qDebug() << "load called"; | 16 | qDebug() << "load called"; |
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index 668fbbf..13c174b 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp | |||
@@ -46,7 +46,7 @@ DummyResourceFacade::~DummyResourceFacade() | |||
46 | { | 46 | { |
47 | } | 47 | } |
48 | 48 | ||
49 | void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject) | 49 | Async::Job<void> DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject) |
50 | { | 50 | { |
51 | //Create message buffer and send to resource | 51 | //Create message buffer and send to resource |
52 | flatbuffers::FlatBufferBuilder eventFbb; | 52 | flatbuffers::FlatBufferBuilder eventFbb; |
@@ -64,24 +64,28 @@ void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject) | |||
64 | Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), 0, 0); | 64 | Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), 0, 0); |
65 | 65 | ||
66 | flatbuffers::FlatBufferBuilder fbb; | 66 | flatbuffers::FlatBufferBuilder fbb; |
67 | auto type = fbb.CreateString(Akonadi2::Domain::getTypeName<Akonadi2::Domain::Event>().toStdString().data()); | 67 | //This is the resource type and not the domain type |
68 | auto type = fbb.CreateString("event"); | ||
68 | auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); | 69 | auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); |
69 | Akonadi2::Commands::CreateEntityBuilder builder(fbb); | 70 | Akonadi2::Commands::CreateEntityBuilder builder(fbb); |
70 | builder.add_domainType(type); | 71 | builder.add_domainType(type); |
71 | builder.add_delta(delta); | 72 | builder.add_delta(delta); |
72 | auto location = builder.Finish(); | 73 | auto location = builder.Finish(); |
73 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); | 74 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); |
74 | mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); | 75 | mResourceAccess->open(); |
76 | return mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); | ||
75 | } | 77 | } |
76 | 78 | ||
77 | void DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject) | 79 | Async::Job<void> DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject) |
78 | { | 80 | { |
79 | //Create message buffer and send to resource | 81 | //Create message buffer and send to resource |
82 | return Async::null<void>(); | ||
80 | } | 83 | } |
81 | 84 | ||
82 | void DummyResourceFacade::remove(const Akonadi2::Domain::Event &domainObject) | 85 | Async::Job<void> DummyResourceFacade::remove(const Akonadi2::Domain::Event &domainObject) |
83 | { | 86 | { |
84 | //Create message buffer and send to resource | 87 | //Create message buffer and send to resource |
88 | return Async::null<void>(); | ||
85 | } | 89 | } |
86 | 90 | ||
87 | static std::function<bool(const std::string &key, DummyEvent const *buffer)> prepareQuery(const Akonadi2::Query &query) | 91 | static std::function<bool(const std::string &key, DummyEvent const *buffer)> prepareQuery(const Akonadi2::Query &query) |
diff --git a/dummyresource/facade.h b/dummyresource/facade.h index e01d254..9c8827a 100644 --- a/dummyresource/facade.h +++ b/dummyresource/facade.h | |||
@@ -37,9 +37,9 @@ class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::Domain::Event | |||
37 | public: | 37 | public: |
38 | DummyResourceFacade(); | 38 | DummyResourceFacade(); |
39 | virtual ~DummyResourceFacade(); | 39 | virtual ~DummyResourceFacade(); |
40 | virtual void create(const Akonadi2::Domain::Event &domainObject); | 40 | virtual Async::Job<void> create(const Akonadi2::Domain::Event &domainObject); |
41 | virtual void modify(const Akonadi2::Domain::Event &domainObject); | 41 | virtual Async::Job<void> modify(const Akonadi2::Domain::Event &domainObject); |
42 | virtual void remove(const Akonadi2::Domain::Event &domainObject); | 42 | virtual Async::Job<void> remove(const Akonadi2::Domain::Event &domainObject); |
43 | virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, const std::function<void()> &completeCallback); | 43 | virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, const std::function<void()> &completeCallback); |
44 | 44 | ||
45 | private: | 45 | private: |
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index dc0d9dd..a84623d 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -209,7 +209,10 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
209 | case Akonadi2::Commands::ModifyEntityCommand: | 209 | case Akonadi2::Commands::ModifyEntityCommand: |
210 | case Akonadi2::Commands::CreateEntityCommand: | 210 | case Akonadi2::Commands::CreateEntityCommand: |
211 | log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); | 211 | log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); |
212 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | 212 | loadResource(); |
213 | if (m_resource) { | ||
214 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | ||
215 | } | ||
213 | break; | 216 | break; |
214 | default: | 217 | default: |
215 | if (commandId > Akonadi2::Commands::CustomCommand) { | 218 | if (commandId > Akonadi2::Commands::CustomCommand) { |