summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt38
-rw-r--r--common/clientapi.cpp2
-rw-r--r--common/clientapi.h16
-rw-r--r--common/facade.h35
-rw-r--r--common/genericresource.cpp32
-rw-r--r--common/genericresource.h4
-rw-r--r--common/pipeline.cpp9
-rw-r--r--common/pipeline.h5
-rw-r--r--common/resource.cpp8
-rw-r--r--common/resource.h7
-rw-r--r--common/resourceaccess.cpp30
-rw-r--r--common/resourceaccess.h11
12 files changed, 104 insertions, 93 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index f6847a7..2ece210 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -1,19 +1,6 @@
1include_directories(${CMAKE_CURRENT_BINARY_DIR})
2
1project(akonadi2common) 3project(akonadi2common)
2generate_flatbuffers(
3 commands/commandcompletion
4 commands/createentity
5 commands/deleteentity
6 commands/fetchentity
7 commands/handshake
8 commands/modifyentity
9 commands/revisionupdate
10 commands/synchronize
11 commands/notification
12 domain/event
13 entity
14 metadata
15 queuedcommand
16)
17 4
18if (STORAGE_unqlite) 5if (STORAGE_unqlite)
19 add_definitions(-DUNQLITE_ENABLE_THREADS -fpermissive) 6 add_definitions(-DUNQLITE_ENABLE_THREADS -fpermissive)
@@ -41,8 +28,27 @@ set(command_SRCS
41 ${storage_SRCS}) 28 ${storage_SRCS})
42 29
43add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 30add_library(${PROJECT_NAME} SHARED ${command_SRCS})
31
32generate_flatbuffers(
33 ${PROJECT_NAME}
34
35 commands/commandcompletion
36 commands/createentity
37 commands/deleteentity
38 commands/fetchentity
39 commands/handshake
40 commands/modifyentity
41 commands/revisionupdate
42 commands/synchronize
43 commands/notification
44 domain/event
45 entity
46 metadata
47 queuedcommand
48)
49
44generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) 50generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h)
45SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) 51SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX)
46qt5_use_modules(${PROJECT_NAME} Network) 52qt5_use_modules(${PROJECT_NAME} Network)
47target_link_libraries(${PROJECT_NAME} ${storage_LIBS} akonadi2async) 53target_link_libraries(${PROJECT_NAME} ${storage_LIBS} KF5::Async)
48install(TARGETS ${PROJECT_NAME} ${KDE_INSTALL_TARGETS_DEFAULT_ARGS}) 54install(TARGETS ${PROJECT_NAME} ${KDE_INSTALL_TARGETS_DEFAULT_ARGS})
diff --git a/common/clientapi.cpp b/common/clientapi.cpp
index e4608c8..d287fcf 100644
--- a/common/clientapi.cpp
+++ b/common/clientapi.cpp
@@ -42,7 +42,7 @@ QByteArray getTypeName<AkonadiResource>()
42void Store::shutdown(const QByteArray &identifier) 42void Store::shutdown(const QByteArray &identifier)
43{ 43{
44 Trace() << "shutdown"; 44 Trace() << "shutdown";
45 ResourceAccess::connectToServer(identifier).then<void, QSharedPointer<QLocalSocket>>([identifier](const QSharedPointer<QLocalSocket> &socket, Async::Future<void> &future) { 45 ResourceAccess::connectToServer(identifier).then<void, QSharedPointer<QLocalSocket>>([identifier](const QSharedPointer<QLocalSocket> &socket, KAsync::Future<void> &future) {
46 //We can't currently reuse the socket 46 //We can't currently reuse the socket
47 socket->close(); 47 socket->close();
48 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(identifier); 48 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(identifier);
diff --git a/common/clientapi.h b/common/clientapi.h
index 1bd8bdc..c098bb5 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -28,8 +28,10 @@
28#include <QEventLoop> 28#include <QEventLoop>
29#include <functional> 29#include <functional>
30#include <memory> 30#include <memory>
31
32#include <Async/Async>
33
31#include "threadboundary.h" 34#include "threadboundary.h"
32#include "async/src/async.h"
33#include "resultprovider.h" 35#include "resultprovider.h"
34 36
35namespace async { 37namespace async {
@@ -228,10 +230,10 @@ class StoreFacade {
228public: 230public:
229 virtual ~StoreFacade(){}; 231 virtual ~StoreFacade(){};
230 QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } 232 QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); }
231 virtual Async::Job<void> create(const DomainType &domainObject) = 0; 233 virtual KAsync::Job<void> create(const DomainType &domainObject) = 0;
232 virtual Async::Job<void> modify(const DomainType &domainObject) = 0; 234 virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0;
233 virtual Async::Job<void> remove(const DomainType &domainObject) = 0; 235 virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0;
234 virtual Async::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) = 0; 236 virtual KAsync::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) = 0;
235}; 237};
236 238
237 239
@@ -341,8 +343,8 @@ public:
341 //The result provider must be threadsafe. 343 //The result provider must be threadsafe.
342 async::run([query, resultSet](){ 344 async::run([query, resultSet](){
343 // Query all resources and aggregate results 345 // Query all resources and aggregate results
344 Async::iterate(query.resources) 346 KAsync::iterate(query.resources)
345 .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, Async::Future<void> &future) { 347 .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, KAsync::Future<void> &future) {
346 //TODO pass resource identifier to factory 348 //TODO pass resource identifier to factory
347 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); 349 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource);
348 if (facade) { 350 if (facade) {
diff --git a/common/facade.h b/common/facade.h
index 8c6578f..5743aa2 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -23,7 +23,8 @@
23 23
24#include <QByteArray> 24#include <QByteArray>
25 25
26#include "async/src/async.h" 26#include <Async/Async>
27
27#include "resourceaccess.h" 28#include "resourceaccess.h"
28#include "commands.h" 29#include "commands.h"
29#include "createentity_generated.h" 30#include "createentity_generated.h"
@@ -44,13 +45,13 @@ class QueryRunner : public QObject
44{ 45{
45 Q_OBJECT 46 Q_OBJECT
46public: 47public:
47 typedef std::function<Async::Job<qint64>(qint64 oldRevision, qint64 newRevision)> QueryFunction; 48 typedef std::function<KAsync::Job<qint64>(qint64 oldRevision, qint64 newRevision)> QueryFunction;
48 49
49 QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; 50 QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {};
50 /** 51 /**
51 * Starts query 52 * Starts query
52 */ 53 */
53 Async::Job<void> run(qint64 newRevision = 0) 54 KAsync::Job<void> run(qint64 newRevision = 0)
54 { 55 {
55 //TODO: JOBAPI: that last empty .then should not be necessary 56 //TODO: JOBAPI: that last empty .then should not be necessary
56 return queryFunction(mLatestRevision, newRevision).then<void, qint64>([this](qint64 revision) { 57 return queryFunction(mLatestRevision, newRevision).then<void, qint64>([this](qint64 revision) {
@@ -120,7 +121,7 @@ public:
120 return Akonadi2::ApplicationDomain::getTypeName<DomainType>(); 121 return Akonadi2::ApplicationDomain::getTypeName<DomainType>();
121 } 122 }
122 123
123 Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE 124 KAsync::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE
124 { 125 {
125 if (!mDomainTypeAdaptorFactory) { 126 if (!mDomainTypeAdaptorFactory) {
126 Warning() << "No domain type adaptor factory available"; 127 Warning() << "No domain type adaptor factory available";
@@ -130,25 +131,25 @@ public:
130 return sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); 131 return sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
131 } 132 }
132 133
133 Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE 134 KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE
134 { 135 {
135 //TODO 136 //TODO
136 return Async::null<void>(); 137 return KAsync::null<void>();
137 } 138 }
138 139
139 Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE 140 KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE
140 { 141 {
141 //TODO 142 //TODO
142 return Async::null<void>(); 143 return KAsync::null<void>();
143 } 144 }
144 145
145 //TODO JOBAPI return job from sync continuation to execute it as subjob? 146 //TODO JOBAPI return job from sync continuation to execute it as subjob?
146 Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE 147 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE
147 { 148 {
148 auto runner = QSharedPointer<QueryRunner>::create(query); 149 auto runner = QSharedPointer<QueryRunner>::create(query);
149 QWeakPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > weakResultProvider = resultProvider; 150 QWeakPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > weakResultProvider = resultProvider;
150 runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> { 151 runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision, qint64 newRevision) -> KAsync::Job<qint64> {
151 return Async::start<qint64>([this, weakResultProvider, query, oldRevision, newRevision](Async::Future<qint64> &future) { 152 return KAsync::start<qint64>([this, weakResultProvider, query, oldRevision, newRevision](KAsync::Future<qint64> &future) {
152 auto resultProvider = weakResultProvider.toStrongRef(); 153 auto resultProvider = weakResultProvider.toStrongRef();
153 if (!resultProvider) { 154 if (!resultProvider) {
154 Warning() << "Tried executing query after result provider is already gone"; 155 Warning() << "Tried executing query after result provider is already gone";
@@ -175,7 +176,7 @@ public:
175 } 176 }
176 177
177 //We have to capture the runner to keep it alive 178 //We have to capture the runner to keep it alive
178 return synchronizeResource(query.syncOnDemand, query.processAll).template then<void>([runner](Async::Future<void> &future) { 179 return synchronizeResource(query.syncOnDemand, query.processAll).template then<void>([runner](KAsync::Future<void> &future) {
179 runner->run().then<void>([&future]() { 180 runner->run().then<void>([&future]() {
180 future.setFinished(); 181 future.setFinished();
181 }).exec(); 182 }).exec();
@@ -183,7 +184,7 @@ public:
183 } 184 }
184 185
185protected: 186protected:
186 Async::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) 187 KAsync::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer)
187 { 188 {
188 flatbuffers::FlatBufferBuilder fbb; 189 flatbuffers::FlatBufferBuilder fbb;
189 //This is the resource buffer type and not the domain type 190 //This is the resource buffer type and not the domain type
@@ -195,7 +196,7 @@ protected:
195 return mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); 196 return mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb);
196 } 197 }
197 198
198 Async::Job<void> synchronizeResource(bool sync, bool processAll) 199 KAsync::Job<void> synchronizeResource(bool sync, bool processAll)
199 { 200 {
200 //TODO check if a sync is necessary 201 //TODO check if a sync is necessary
201 //TODO Only sync what was requested 202 //TODO Only sync what was requested
@@ -203,17 +204,17 @@ protected:
203 //TODO the synchronization should normally not be necessary: We just return what is already available. 204 //TODO the synchronization should normally not be necessary: We just return what is already available.
204 205
205 if (sync || processAll) { 206 if (sync || processAll) {
206 return Async::start<void>([=](Async::Future<void> &future) { 207 return KAsync::start<void>([=](KAsync::Future<void> &future) {
207 mResourceAccess->open(); 208 mResourceAccess->open();
208 mResourceAccess->synchronizeResource(sync, processAll).then<void>([&future]() { 209 mResourceAccess->synchronizeResource(sync, processAll).then<void>([&future]() {
209 future.setFinished(); 210 future.setFinished();
210 }).exec(); 211 }).exec();
211 }); 212 });
212 } 213 }
213 return Async::null<void>(); 214 return KAsync::null<void>();
214 } 215 }
215 216
216 virtual Async::Job<qint64> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider, qint64 oldRevision, qint64 newRevision) { return Async::null<qint64>(); }; 217 virtual KAsync::Job<qint64> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider, qint64 oldRevision, qint64 newRevision) { return KAsync::null<qint64>(); };
217 218
218protected: 219protected:
219 //TODO use one resource access instance per application => make static 220 //TODO use one resource access instance per application => make static
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index ea6413b..2394b80 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -61,33 +61,33 @@ private slots:
61 }).exec(); 61 }).exec();
62 } 62 }
63 63
64 Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) 64 KAsync::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand)
65 { 65 {
66 Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); 66 Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId());
67 //Throw command into appropriate pipeline 67 //Throw command into appropriate pipeline
68 switch (queuedCommand->commandId()) { 68 switch (queuedCommand->commandId()) {
69 case Akonadi2::Commands::DeleteEntityCommand: 69 case Akonadi2::Commands::DeleteEntityCommand:
70 //mPipeline->removedEntity 70 //mPipeline->removedEntity
71 return Async::null<void>(); 71 return KAsync::null<void>();
72 case Akonadi2::Commands::ModifyEntityCommand: 72 case Akonadi2::Commands::ModifyEntityCommand:
73 //mPipeline->modifiedEntity 73 //mPipeline->modifiedEntity
74 return Async::null<void>(); 74 return KAsync::null<void>();
75 case Akonadi2::Commands::CreateEntityCommand: 75 case Akonadi2::Commands::CreateEntityCommand:
76 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 76 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
77 default: 77 default:
78 return Async::error<void>(-1, "Unhandled command"); 78 return KAsync::error<void>(-1, "Unhandled command");
79 } 79 }
80 return Async::null<void>(); 80 return KAsync::null<void>();
81 } 81 }
82 82
83 //Process all messages of this queue 83 //Process all messages of this queue
84 Async::Job<void> processQueue(MessageQueue *queue) 84 KAsync::Job<void> processQueue(MessageQueue *queue)
85 { 85 {
86 //TODO use something like: 86 //TODO use something like:
87 //Async::foreach("pass iterator here").each("process value here").join(); 87 //KAsync::foreach("pass iterator here").each("process value here").join();
88 //Async::foreach("pass iterator here").parallel("process value here").join(); 88 //KAsync::foreach("pass iterator here").parallel("process value here").join();
89 return Async::dowhile( 89 return KAsync::dowhile(
90 [this, queue](Async::Future<bool> &future) { 90 [this, queue](KAsync::Future<bool> &future) {
91 if (queue->isEmpty()) { 91 if (queue->isEmpty()) {
92 future.setValue(false); 92 future.setValue(false);
93 future.setFinished(); 93 future.setFinished();
@@ -133,13 +133,13 @@ private slots:
133 ); 133 );
134 } 134 }
135 135
136 Async::Job<void> processPipeline() 136 KAsync::Job<void> processPipeline()
137 { 137 {
138 //Go through all message queues 138 //Go through all message queues
139 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); 139 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
140 return Async::dowhile( 140 return KAsync::dowhile(
141 [it]() { return it->hasNext(); }, 141 [it]() { return it->hasNext(); },
142 [it, this](Async::Future<void> &future) { 142 [it, this](KAsync::Future<void> &future) {
143 auto queue = it->next(); 143 auto queue = it->next();
144 processQueue(queue).then<void>([&future]() { 144 processQueue(queue).then<void>([&future]() {
145 Trace() << "Queue processed"; 145 Trace() << "Queue processed";
@@ -206,12 +206,12 @@ void GenericResource::processCommand(int commandId, const QByteArray &data, uint
206 enqueueCommand(mUserQueue, commandId, data); 206 enqueueCommand(mUserQueue, commandId, data);
207} 207}
208 208
209Async::Job<void> GenericResource::processAllMessages() 209KAsync::Job<void> GenericResource::processAllMessages()
210{ 210{
211 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. 211 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
212 //TODO: report errors while processing sync? 212 //TODO: report errors while processing sync?
213 //TODO JOBAPI: A helper that waits for n events and then continues? 213 //TODO JOBAPI: A helper that waits for n events and then continues?
214 return Async::start<void>([this](Async::Future<void> &f) { 214 return KAsync::start<void>([this](KAsync::Future<void> &f) {
215 if (mSynchronizerQueue.isEmpty()) { 215 if (mSynchronizerQueue.isEmpty()) {
216 f.setFinished(); 216 f.setFinished();
217 } else { 217 } else {
@@ -219,7 +219,7 @@ Async::Job<void> GenericResource::processAllMessages()
219 f.setFinished(); 219 f.setFinished();
220 }); 220 });
221 } 221 }
222 }).then<void>([this](Async::Future<void> &f) { 222 }).then<void>([this](KAsync::Future<void> &f) {
223 if (mUserQueue.isEmpty()) { 223 if (mUserQueue.isEmpty()) {
224 f.setFinished(); 224 f.setFinished();
225 } else { 225 } else {
diff --git a/common/genericresource.h b/common/genericresource.h
index 36fa567..ac28575 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -38,8 +38,8 @@ public:
38 virtual ~GenericResource(); 38 virtual ~GenericResource();
39 39
40 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; 40 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE;
41 virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; 41 virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0;
42 virtual Async::Job<void> processAllMessages() Q_DECL_OVERRIDE; 42 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
43 43
44 virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE; 44 virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE;
45 int error() const; 45 int error() const;
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index e2f23ed..21cf1c5 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -29,7 +29,6 @@
29#include "metadata_generated.h" 29#include "metadata_generated.h"
30#include "createentity_generated.h" 30#include "createentity_generated.h"
31#include "entitybuffer.h" 31#include "entitybuffer.h"
32#include "async/src/async.h"
33#include "log.h" 32#include "log.h"
34 33
35namespace Akonadi2 34namespace Akonadi2
@@ -94,7 +93,7 @@ void Pipeline::null()
94 // state.step(); 93 // state.step();
95} 94}
96 95
97Async::Job<void> Pipeline::newEntity(void const *command, size_t size) 96KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
98{ 97{
99 Log() << "Pipeline: New Entity"; 98 Log() << "Pipeline: New Entity";
100 99
@@ -107,7 +106,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size)
107 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 106 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
108 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { 107 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) {
109 qWarning() << "invalid buffer, not a create entity buffer"; 108 qWarning() << "invalid buffer, not a create entity buffer";
110 return Async::error<void>(); 109 return KAsync::error<void>();
111 } 110 }
112 } 111 }
113 auto createEntity = Akonadi2::Commands::GetCreateEntity(command); 112 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
@@ -118,7 +117,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size)
118 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 117 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
119 if (!Akonadi2::VerifyEntityBuffer(verifyer)) { 118 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
120 qWarning() << "invalid buffer, not an entity buffer"; 119 qWarning() << "invalid buffer, not an entity buffer";
121 return Async::error<void>(); 120 return KAsync::error<void>();
122 } 121 }
123 } 122 }
124 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); 123 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data());
@@ -139,7 +138,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size)
139 storage().setMaxRevision(newRevision); 138 storage().setMaxRevision(newRevision);
140 Log() << "Pipeline: wrote entity: "<< newRevision; 139 Log() << "Pipeline: wrote entity: "<< newRevision;
141 140
142 return Async::start<void>([this, key, entityType](Async::Future<void> &future) { 141 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
143 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { 142 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() {
144 future.setFinished(); 143 future.setFinished();
145 }); 144 });
diff --git a/common/pipeline.h b/common/pipeline.h
index a574d27..b695bde 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -27,7 +27,8 @@
27 27
28#include <akonadi2common_export.h> 28#include <akonadi2common_export.h>
29#include <storage.h> 29#include <storage.h>
30#include "async/src/async.h" 30
31#include <Async/Async>
31 32
32#include "entity_generated.h" 33#include "entity_generated.h"
33 34
@@ -53,7 +54,7 @@ public:
53 54
54 void null(); 55 void null();
55 56
56 Async::Job<void> newEntity(void const *command, size_t size); 57 KAsync::Job<void> newEntity(void const *command, size_t size);
57 void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); 58 void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size);
58 void deletedEntity(const QString &entityType, const QByteArray &key); 59 void deletedEntity(const QString &entityType, const QByteArray &key);
59 60
diff --git a/common/resource.cpp b/common/resource.cpp
index e158a40..bd69afd 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -53,16 +53,16 @@ void Resource::processCommand(int commandId, const QByteArray &data, uint size,
53 pipeline->null(); 53 pipeline->null();
54} 54}
55 55
56Async::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline) 56KAsync::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline)
57{ 57{
58 return Async::start<void>([pipeline](Async::Future<void> &f) { 58 return KAsync::start<void>([pipeline](KAsync::Future<void> &f) {
59 pipeline->null(); 59 pipeline->null();
60 }); 60 });
61} 61}
62 62
63Async::Job<void> Resource::processAllMessages() 63KAsync::Job<void> Resource::processAllMessages()
64{ 64{
65 return Async::null<void>(); 65 return KAsync::null<void>();
66} 66}
67 67
68class ResourceFactory::Private 68class ResourceFactory::Private
diff --git a/common/resource.h b/common/resource.h
index 18a6827..ea1e9d8 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -21,7 +21,8 @@
21#include <akonadi2common_export.h> 21#include <akonadi2common_export.h>
22#include <clientapi.h> 22#include <clientapi.h>
23#include <pipeline.h> 23#include <pipeline.h>
24#include <async/src/async.h> 24
25#include <Async/Async>
25 26
26namespace Akonadi2 27namespace Akonadi2
27{ 28{
@@ -36,8 +37,8 @@ public:
36 virtual ~Resource(); 37 virtual ~Resource();
37 38
38 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); 39 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline);
39 virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); 40 virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline);
40 virtual Async::Job<void> processAllMessages(); 41 virtual KAsync::Job<void> processAllMessages();
41 42
42 virtual void configurePipeline(Pipeline *pipeline); 43 virtual void configurePipeline(Pipeline *pipeline);
43 44
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index b7d569b..feffcf4 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -70,8 +70,8 @@ class ResourceAccess::Private
70{ 70{
71public: 71public:
72 Private(const QByteArray &name, ResourceAccess *ra); 72 Private(const QByteArray &name, ResourceAccess *ra);
73 Async::Job<void> tryToConnect(); 73 KAsync::Job<void> tryToConnect();
74 Async::Job<void> initializeSocket(); 74 KAsync::Job<void> initializeSocket();
75 QByteArray resourceName; 75 QByteArray resourceName;
76 QSharedPointer<QLocalSocket> socket; 76 QSharedPointer<QLocalSocket> socket;
77 QByteArray partialMessageBuffer; 77 QByteArray partialMessageBuffer;
@@ -89,10 +89,10 @@ ResourceAccess::Private::Private(const QByteArray &name, ResourceAccess *q)
89} 89}
90 90
91//Connects to server and returns connected socket on success 91//Connects to server and returns connected socket on success
92Async::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const QByteArray &identifier) 92KAsync::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const QByteArray &identifier)
93{ 93{
94 auto s = QSharedPointer<QLocalSocket>::create(); 94 auto s = QSharedPointer<QLocalSocket>::create();
95 return Async::start<QSharedPointer<QLocalSocket> >([identifier, s](Async::Future<QSharedPointer<QLocalSocket> > &future) { 95 return KAsync::start<QSharedPointer<QLocalSocket> >([identifier, s](KAsync::Future<QSharedPointer<QLocalSocket> > &future) {
96 s->setServerName(identifier); 96 s->setServerName(identifier);
97 auto context = new QObject; 97 auto context = new QObject;
98 QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() { 98 QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() {
@@ -109,15 +109,15 @@ Async::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const
109 }); 109 });
110} 110}
111 111
112Async::Job<void> ResourceAccess::Private::tryToConnect() 112KAsync::Job<void> ResourceAccess::Private::tryToConnect()
113{ 113{
114 return Async::dowhile([this]() -> bool { 114 return KAsync::dowhile([this]() -> bool {
115 //TODO abort after N retries? 115 //TODO abort after N retries?
116 return !socket; 116 return !socket;
117 }, 117 },
118 [this](Async::Future<void> &future) { 118 [this](KAsync::Future<void> &future) {
119 Trace() << "Loop"; 119 Trace() << "Loop";
120 Async::wait(50) 120 KAsync::wait(50)
121 .then(connectToServer(resourceName)) 121 .then(connectToServer(resourceName))
122 .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { 122 .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) {
123 Q_ASSERT(s); 123 Q_ASSERT(s);
@@ -130,9 +130,9 @@ Async::Job<void> ResourceAccess::Private::tryToConnect()
130 }); 130 });
131} 131}
132 132
133Async::Job<void> ResourceAccess::Private::initializeSocket() 133KAsync::Job<void> ResourceAccess::Private::initializeSocket()
134{ 134{
135 return Async::start<void>([this](Async::Future<void> &future) { 135 return KAsync::start<void>([this](KAsync::Future<void> &future) {
136 Trace() << "Trying to connect"; 136 Trace() << "Trying to connect";
137 connectToServer(resourceName).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { 137 connectToServer(resourceName).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) {
138 Trace() << "Connected to resource, without having to start it."; 138 Trace() << "Connected to resource, without having to start it.";
@@ -189,9 +189,9 @@ void ResourceAccess::registerCallback(uint messageId, const std::function<void(i
189 d->resultHandler.insert(messageId, callback); 189 d->resultHandler.insert(messageId, callback);
190} 190}
191 191
192Async::Job<void> ResourceAccess::sendCommand(int commandId) 192KAsync::Job<void> ResourceAccess::sendCommand(int commandId)
193{ 193{
194 return Async::start<void>([this, commandId](Async::Future<void> &f) { 194 return KAsync::start<void>([this, commandId](KAsync::Future<void> &f) {
195 auto continuation = [&f](int error, const QString &errorMessage) { 195 auto continuation = [&f](int error, const QString &errorMessage) {
196 if (error) { 196 if (error) {
197 f.setError(error, errorMessage); 197 f.setError(error, errorMessage);
@@ -205,11 +205,11 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId)
205 }); 205 });
206} 206}
207 207
208Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) 208KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb)
209{ 209{
210 //The flatbuffer is transient, but we want to store it until the job is executed 210 //The flatbuffer is transient, but we want to store it until the job is executed
211 QByteArray buffer(reinterpret_cast<const char*>(fbb.GetBufferPointer()), fbb.GetSize()); 211 QByteArray buffer(reinterpret_cast<const char*>(fbb.GetBufferPointer()), fbb.GetSize());
212 return Async::start<void>([commandId, buffer, this](Async::Future<void> &f) { 212 return KAsync::start<void>([commandId, buffer, this](KAsync::Future<void> &f) {
213 auto callback = [&f](int error, const QString &errorMessage) { 213 auto callback = [&f](int error, const QString &errorMessage) {
214 if (error) { 214 if (error) {
215 f.setError(error, errorMessage); 215 f.setError(error, errorMessage);
@@ -225,7 +225,7 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu
225 }); 225 });
226} 226}
227 227
228Async::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) 228KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync)
229{ 229{
230 auto command = Akonadi2::CreateSynchronize(d->fbb, sourceSync, localSync); 230 auto command = Akonadi2::CreateSynchronize(d->fbb, sourceSync, localSync);
231 Akonadi2::FinishSynchronizeBuffer(d->fbb, command); 231 Akonadi2::FinishSynchronizeBuffer(d->fbb, command);
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index c16a9d2..b779db9 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -24,8 +24,9 @@
24#include <QObject> 24#include <QObject>
25#include <QTimer> 25#include <QTimer>
26 26
27#include <Async/Async>
28
27#include <flatbuffers/flatbuffers.h> 29#include <flatbuffers/flatbuffers.h>
28#include <async/src/async.h>
29 30
30namespace Akonadi2 31namespace Akonadi2
31{ 32{
@@ -43,13 +44,13 @@ public:
43 QByteArray resourceName() const; 44 QByteArray resourceName() const;
44 bool isReady() const; 45 bool isReady() const;
45 46
46 Async::Job<void> sendCommand(int commandId); 47 KAsync::Job<void> sendCommand(int commandId);
47 Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); 48 KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb);
48 Async::Job<void> synchronizeResource(bool remoteSync, bool localSync); 49 KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync);
49 /** 50 /**
50 * Tries to connect to server, and returns a connected socket on success. 51 * Tries to connect to server, and returns a connected socket on success.
51 */ 52 */
52 static Async::Job<QSharedPointer<QLocalSocket> > connectToServer(const QByteArray &identifier); 53 static KAsync::Job<QSharedPointer<QLocalSocket> > connectToServer(const QByteArray &identifier);
53 54
54public Q_SLOTS: 55public Q_SLOTS:
55 void open(); 56 void open();