diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/CMakeLists.txt | 38 | ||||
-rw-r--r-- | common/clientapi.cpp | 2 | ||||
-rw-r--r-- | common/clientapi.h | 16 | ||||
-rw-r--r-- | common/facade.h | 35 | ||||
-rw-r--r-- | common/genericresource.cpp | 32 | ||||
-rw-r--r-- | common/genericresource.h | 4 | ||||
-rw-r--r-- | common/pipeline.cpp | 9 | ||||
-rw-r--r-- | common/pipeline.h | 5 | ||||
-rw-r--r-- | common/resource.cpp | 8 | ||||
-rw-r--r-- | common/resource.h | 7 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 30 | ||||
-rw-r--r-- | common/resourceaccess.h | 11 |
12 files changed, 104 insertions, 93 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index f6847a7..2ece210 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -1,19 +1,6 @@ | |||
1 | include_directories(${CMAKE_CURRENT_BINARY_DIR}) | ||
2 | |||
1 | project(akonadi2common) | 3 | project(akonadi2common) |
2 | generate_flatbuffers( | ||
3 | commands/commandcompletion | ||
4 | commands/createentity | ||
5 | commands/deleteentity | ||
6 | commands/fetchentity | ||
7 | commands/handshake | ||
8 | commands/modifyentity | ||
9 | commands/revisionupdate | ||
10 | commands/synchronize | ||
11 | commands/notification | ||
12 | domain/event | ||
13 | entity | ||
14 | metadata | ||
15 | queuedcommand | ||
16 | ) | ||
17 | 4 | ||
18 | if (STORAGE_unqlite) | 5 | if (STORAGE_unqlite) |
19 | add_definitions(-DUNQLITE_ENABLE_THREADS -fpermissive) | 6 | add_definitions(-DUNQLITE_ENABLE_THREADS -fpermissive) |
@@ -41,8 +28,27 @@ set(command_SRCS | |||
41 | ${storage_SRCS}) | 28 | ${storage_SRCS}) |
42 | 29 | ||
43 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) | 30 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) |
31 | |||
32 | generate_flatbuffers( | ||
33 | ${PROJECT_NAME} | ||
34 | |||
35 | commands/commandcompletion | ||
36 | commands/createentity | ||
37 | commands/deleteentity | ||
38 | commands/fetchentity | ||
39 | commands/handshake | ||
40 | commands/modifyentity | ||
41 | commands/revisionupdate | ||
42 | commands/synchronize | ||
43 | commands/notification | ||
44 | domain/event | ||
45 | entity | ||
46 | metadata | ||
47 | queuedcommand | ||
48 | ) | ||
49 | |||
44 | generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) | 50 | generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) |
45 | SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) | 51 | SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) |
46 | qt5_use_modules(${PROJECT_NAME} Network) | 52 | qt5_use_modules(${PROJECT_NAME} Network) |
47 | target_link_libraries(${PROJECT_NAME} ${storage_LIBS} akonadi2async) | 53 | target_link_libraries(${PROJECT_NAME} ${storage_LIBS} KF5::Async) |
48 | install(TARGETS ${PROJECT_NAME} ${KDE_INSTALL_TARGETS_DEFAULT_ARGS}) | 54 | install(TARGETS ${PROJECT_NAME} ${KDE_INSTALL_TARGETS_DEFAULT_ARGS}) |
diff --git a/common/clientapi.cpp b/common/clientapi.cpp index e4608c8..d287fcf 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp | |||
@@ -42,7 +42,7 @@ QByteArray getTypeName<AkonadiResource>() | |||
42 | void Store::shutdown(const QByteArray &identifier) | 42 | void Store::shutdown(const QByteArray &identifier) |
43 | { | 43 | { |
44 | Trace() << "shutdown"; | 44 | Trace() << "shutdown"; |
45 | ResourceAccess::connectToServer(identifier).then<void, QSharedPointer<QLocalSocket>>([identifier](const QSharedPointer<QLocalSocket> &socket, Async::Future<void> &future) { | 45 | ResourceAccess::connectToServer(identifier).then<void, QSharedPointer<QLocalSocket>>([identifier](const QSharedPointer<QLocalSocket> &socket, KAsync::Future<void> &future) { |
46 | //We can't currently reuse the socket | 46 | //We can't currently reuse the socket |
47 | socket->close(); | 47 | socket->close(); |
48 | auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(identifier); | 48 | auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(identifier); |
diff --git a/common/clientapi.h b/common/clientapi.h index 1bd8bdc..c098bb5 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -28,8 +28,10 @@ | |||
28 | #include <QEventLoop> | 28 | #include <QEventLoop> |
29 | #include <functional> | 29 | #include <functional> |
30 | #include <memory> | 30 | #include <memory> |
31 | |||
32 | #include <Async/Async> | ||
33 | |||
31 | #include "threadboundary.h" | 34 | #include "threadboundary.h" |
32 | #include "async/src/async.h" | ||
33 | #include "resultprovider.h" | 35 | #include "resultprovider.h" |
34 | 36 | ||
35 | namespace async { | 37 | namespace async { |
@@ -228,10 +230,10 @@ class StoreFacade { | |||
228 | public: | 230 | public: |
229 | virtual ~StoreFacade(){}; | 231 | virtual ~StoreFacade(){}; |
230 | QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } | 232 | QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } |
231 | virtual Async::Job<void> create(const DomainType &domainObject) = 0; | 233 | virtual KAsync::Job<void> create(const DomainType &domainObject) = 0; |
232 | virtual Async::Job<void> modify(const DomainType &domainObject) = 0; | 234 | virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0; |
233 | virtual Async::Job<void> remove(const DomainType &domainObject) = 0; | 235 | virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0; |
234 | virtual Async::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) = 0; | 236 | virtual KAsync::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) = 0; |
235 | }; | 237 | }; |
236 | 238 | ||
237 | 239 | ||
@@ -341,8 +343,8 @@ public: | |||
341 | //The result provider must be threadsafe. | 343 | //The result provider must be threadsafe. |
342 | async::run([query, resultSet](){ | 344 | async::run([query, resultSet](){ |
343 | // Query all resources and aggregate results | 345 | // Query all resources and aggregate results |
344 | Async::iterate(query.resources) | 346 | KAsync::iterate(query.resources) |
345 | .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, Async::Future<void> &future) { | 347 | .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, KAsync::Future<void> &future) { |
346 | //TODO pass resource identifier to factory | 348 | //TODO pass resource identifier to factory |
347 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); | 349 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); |
348 | if (facade) { | 350 | if (facade) { |
diff --git a/common/facade.h b/common/facade.h index 8c6578f..5743aa2 100644 --- a/common/facade.h +++ b/common/facade.h | |||
@@ -23,7 +23,8 @@ | |||
23 | 23 | ||
24 | #include <QByteArray> | 24 | #include <QByteArray> |
25 | 25 | ||
26 | #include "async/src/async.h" | 26 | #include <Async/Async> |
27 | |||
27 | #include "resourceaccess.h" | 28 | #include "resourceaccess.h" |
28 | #include "commands.h" | 29 | #include "commands.h" |
29 | #include "createentity_generated.h" | 30 | #include "createentity_generated.h" |
@@ -44,13 +45,13 @@ class QueryRunner : public QObject | |||
44 | { | 45 | { |
45 | Q_OBJECT | 46 | Q_OBJECT |
46 | public: | 47 | public: |
47 | typedef std::function<Async::Job<qint64>(qint64 oldRevision, qint64 newRevision)> QueryFunction; | 48 | typedef std::function<KAsync::Job<qint64>(qint64 oldRevision, qint64 newRevision)> QueryFunction; |
48 | 49 | ||
49 | QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; | 50 | QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; |
50 | /** | 51 | /** |
51 | * Starts query | 52 | * Starts query |
52 | */ | 53 | */ |
53 | Async::Job<void> run(qint64 newRevision = 0) | 54 | KAsync::Job<void> run(qint64 newRevision = 0) |
54 | { | 55 | { |
55 | //TODO: JOBAPI: that last empty .then should not be necessary | 56 | //TODO: JOBAPI: that last empty .then should not be necessary |
56 | return queryFunction(mLatestRevision, newRevision).then<void, qint64>([this](qint64 revision) { | 57 | return queryFunction(mLatestRevision, newRevision).then<void, qint64>([this](qint64 revision) { |
@@ -120,7 +121,7 @@ public: | |||
120 | return Akonadi2::ApplicationDomain::getTypeName<DomainType>(); | 121 | return Akonadi2::ApplicationDomain::getTypeName<DomainType>(); |
121 | } | 122 | } |
122 | 123 | ||
123 | Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE | 124 | KAsync::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE |
124 | { | 125 | { |
125 | if (!mDomainTypeAdaptorFactory) { | 126 | if (!mDomainTypeAdaptorFactory) { |
126 | Warning() << "No domain type adaptor factory available"; | 127 | Warning() << "No domain type adaptor factory available"; |
@@ -130,25 +131,25 @@ public: | |||
130 | return sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); | 131 | return sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); |
131 | } | 132 | } |
132 | 133 | ||
133 | Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE | 134 | KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE |
134 | { | 135 | { |
135 | //TODO | 136 | //TODO |
136 | return Async::null<void>(); | 137 | return KAsync::null<void>(); |
137 | } | 138 | } |
138 | 139 | ||
139 | Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE | 140 | KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE |
140 | { | 141 | { |
141 | //TODO | 142 | //TODO |
142 | return Async::null<void>(); | 143 | return KAsync::null<void>(); |
143 | } | 144 | } |
144 | 145 | ||
145 | //TODO JOBAPI return job from sync continuation to execute it as subjob? | 146 | //TODO JOBAPI return job from sync continuation to execute it as subjob? |
146 | Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE | 147 | KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE |
147 | { | 148 | { |
148 | auto runner = QSharedPointer<QueryRunner>::create(query); | 149 | auto runner = QSharedPointer<QueryRunner>::create(query); |
149 | QWeakPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > weakResultProvider = resultProvider; | 150 | QWeakPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > weakResultProvider = resultProvider; |
150 | runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> { | 151 | runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision, qint64 newRevision) -> KAsync::Job<qint64> { |
151 | return Async::start<qint64>([this, weakResultProvider, query, oldRevision, newRevision](Async::Future<qint64> &future) { | 152 | return KAsync::start<qint64>([this, weakResultProvider, query, oldRevision, newRevision](KAsync::Future<qint64> &future) { |
152 | auto resultProvider = weakResultProvider.toStrongRef(); | 153 | auto resultProvider = weakResultProvider.toStrongRef(); |
153 | if (!resultProvider) { | 154 | if (!resultProvider) { |
154 | Warning() << "Tried executing query after result provider is already gone"; | 155 | Warning() << "Tried executing query after result provider is already gone"; |
@@ -175,7 +176,7 @@ public: | |||
175 | } | 176 | } |
176 | 177 | ||
177 | //We have to capture the runner to keep it alive | 178 | //We have to capture the runner to keep it alive |
178 | return synchronizeResource(query.syncOnDemand, query.processAll).template then<void>([runner](Async::Future<void> &future) { | 179 | return synchronizeResource(query.syncOnDemand, query.processAll).template then<void>([runner](KAsync::Future<void> &future) { |
179 | runner->run().then<void>([&future]() { | 180 | runner->run().then<void>([&future]() { |
180 | future.setFinished(); | 181 | future.setFinished(); |
181 | }).exec(); | 182 | }).exec(); |
@@ -183,7 +184,7 @@ public: | |||
183 | } | 184 | } |
184 | 185 | ||
185 | protected: | 186 | protected: |
186 | Async::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) | 187 | KAsync::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) |
187 | { | 188 | { |
188 | flatbuffers::FlatBufferBuilder fbb; | 189 | flatbuffers::FlatBufferBuilder fbb; |
189 | //This is the resource buffer type and not the domain type | 190 | //This is the resource buffer type and not the domain type |
@@ -195,7 +196,7 @@ protected: | |||
195 | return mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); | 196 | return mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); |
196 | } | 197 | } |
197 | 198 | ||
198 | Async::Job<void> synchronizeResource(bool sync, bool processAll) | 199 | KAsync::Job<void> synchronizeResource(bool sync, bool processAll) |
199 | { | 200 | { |
200 | //TODO check if a sync is necessary | 201 | //TODO check if a sync is necessary |
201 | //TODO Only sync what was requested | 202 | //TODO Only sync what was requested |
@@ -203,17 +204,17 @@ protected: | |||
203 | //TODO the synchronization should normally not be necessary: We just return what is already available. | 204 | //TODO the synchronization should normally not be necessary: We just return what is already available. |
204 | 205 | ||
205 | if (sync || processAll) { | 206 | if (sync || processAll) { |
206 | return Async::start<void>([=](Async::Future<void> &future) { | 207 | return KAsync::start<void>([=](KAsync::Future<void> &future) { |
207 | mResourceAccess->open(); | 208 | mResourceAccess->open(); |
208 | mResourceAccess->synchronizeResource(sync, processAll).then<void>([&future]() { | 209 | mResourceAccess->synchronizeResource(sync, processAll).then<void>([&future]() { |
209 | future.setFinished(); | 210 | future.setFinished(); |
210 | }).exec(); | 211 | }).exec(); |
211 | }); | 212 | }); |
212 | } | 213 | } |
213 | return Async::null<void>(); | 214 | return KAsync::null<void>(); |
214 | } | 215 | } |
215 | 216 | ||
216 | virtual Async::Job<qint64> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider, qint64 oldRevision, qint64 newRevision) { return Async::null<qint64>(); }; | 217 | virtual KAsync::Job<qint64> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider, qint64 oldRevision, qint64 newRevision) { return KAsync::null<qint64>(); }; |
217 | 218 | ||
218 | protected: | 219 | protected: |
219 | //TODO use one resource access instance per application => make static | 220 | //TODO use one resource access instance per application => make static |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index ea6413b..2394b80 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -61,33 +61,33 @@ private slots: | |||
61 | }).exec(); | 61 | }).exec(); |
62 | } | 62 | } |
63 | 63 | ||
64 | Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) | 64 | KAsync::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) |
65 | { | 65 | { |
66 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | 66 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); |
67 | //Throw command into appropriate pipeline | 67 | //Throw command into appropriate pipeline |
68 | switch (queuedCommand->commandId()) { | 68 | switch (queuedCommand->commandId()) { |
69 | case Akonadi2::Commands::DeleteEntityCommand: | 69 | case Akonadi2::Commands::DeleteEntityCommand: |
70 | //mPipeline->removedEntity | 70 | //mPipeline->removedEntity |
71 | return Async::null<void>(); | 71 | return KAsync::null<void>(); |
72 | case Akonadi2::Commands::ModifyEntityCommand: | 72 | case Akonadi2::Commands::ModifyEntityCommand: |
73 | //mPipeline->modifiedEntity | 73 | //mPipeline->modifiedEntity |
74 | return Async::null<void>(); | 74 | return KAsync::null<void>(); |
75 | case Akonadi2::Commands::CreateEntityCommand: | 75 | case Akonadi2::Commands::CreateEntityCommand: |
76 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 76 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
77 | default: | 77 | default: |
78 | return Async::error<void>(-1, "Unhandled command"); | 78 | return KAsync::error<void>(-1, "Unhandled command"); |
79 | } | 79 | } |
80 | return Async::null<void>(); | 80 | return KAsync::null<void>(); |
81 | } | 81 | } |
82 | 82 | ||
83 | //Process all messages of this queue | 83 | //Process all messages of this queue |
84 | Async::Job<void> processQueue(MessageQueue *queue) | 84 | KAsync::Job<void> processQueue(MessageQueue *queue) |
85 | { | 85 | { |
86 | //TODO use something like: | 86 | //TODO use something like: |
87 | //Async::foreach("pass iterator here").each("process value here").join(); | 87 | //KAsync::foreach("pass iterator here").each("process value here").join(); |
88 | //Async::foreach("pass iterator here").parallel("process value here").join(); | 88 | //KAsync::foreach("pass iterator here").parallel("process value here").join(); |
89 | return Async::dowhile( | 89 | return KAsync::dowhile( |
90 | [this, queue](Async::Future<bool> &future) { | 90 | [this, queue](KAsync::Future<bool> &future) { |
91 | if (queue->isEmpty()) { | 91 | if (queue->isEmpty()) { |
92 | future.setValue(false); | 92 | future.setValue(false); |
93 | future.setFinished(); | 93 | future.setFinished(); |
@@ -133,13 +133,13 @@ private slots: | |||
133 | ); | 133 | ); |
134 | } | 134 | } |
135 | 135 | ||
136 | Async::Job<void> processPipeline() | 136 | KAsync::Job<void> processPipeline() |
137 | { | 137 | { |
138 | //Go through all message queues | 138 | //Go through all message queues |
139 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | 139 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); |
140 | return Async::dowhile( | 140 | return KAsync::dowhile( |
141 | [it]() { return it->hasNext(); }, | 141 | [it]() { return it->hasNext(); }, |
142 | [it, this](Async::Future<void> &future) { | 142 | [it, this](KAsync::Future<void> &future) { |
143 | auto queue = it->next(); | 143 | auto queue = it->next(); |
144 | processQueue(queue).then<void>([&future]() { | 144 | processQueue(queue).then<void>([&future]() { |
145 | Trace() << "Queue processed"; | 145 | Trace() << "Queue processed"; |
@@ -206,12 +206,12 @@ void GenericResource::processCommand(int commandId, const QByteArray &data, uint | |||
206 | enqueueCommand(mUserQueue, commandId, data); | 206 | enqueueCommand(mUserQueue, commandId, data); |
207 | } | 207 | } |
208 | 208 | ||
209 | Async::Job<void> GenericResource::processAllMessages() | 209 | KAsync::Job<void> GenericResource::processAllMessages() |
210 | { | 210 | { |
211 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | 211 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. |
212 | //TODO: report errors while processing sync? | 212 | //TODO: report errors while processing sync? |
213 | //TODO JOBAPI: A helper that waits for n events and then continues? | 213 | //TODO JOBAPI: A helper that waits for n events and then continues? |
214 | return Async::start<void>([this](Async::Future<void> &f) { | 214 | return KAsync::start<void>([this](KAsync::Future<void> &f) { |
215 | if (mSynchronizerQueue.isEmpty()) { | 215 | if (mSynchronizerQueue.isEmpty()) { |
216 | f.setFinished(); | 216 | f.setFinished(); |
217 | } else { | 217 | } else { |
@@ -219,7 +219,7 @@ Async::Job<void> GenericResource::processAllMessages() | |||
219 | f.setFinished(); | 219 | f.setFinished(); |
220 | }); | 220 | }); |
221 | } | 221 | } |
222 | }).then<void>([this](Async::Future<void> &f) { | 222 | }).then<void>([this](KAsync::Future<void> &f) { |
223 | if (mUserQueue.isEmpty()) { | 223 | if (mUserQueue.isEmpty()) { |
224 | f.setFinished(); | 224 | f.setFinished(); |
225 | } else { | 225 | } else { |
diff --git a/common/genericresource.h b/common/genericresource.h index 36fa567..ac28575 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -38,8 +38,8 @@ public: | |||
38 | virtual ~GenericResource(); | 38 | virtual ~GenericResource(); |
39 | 39 | ||
40 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; | 40 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; |
41 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; | 41 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; |
42 | virtual Async::Job<void> processAllMessages() Q_DECL_OVERRIDE; | 42 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; |
43 | 43 | ||
44 | virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE; | 44 | virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE; |
45 | int error() const; | 45 | int error() const; |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index e2f23ed..21cf1c5 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -29,7 +29,6 @@ | |||
29 | #include "metadata_generated.h" | 29 | #include "metadata_generated.h" |
30 | #include "createentity_generated.h" | 30 | #include "createentity_generated.h" |
31 | #include "entitybuffer.h" | 31 | #include "entitybuffer.h" |
32 | #include "async/src/async.h" | ||
33 | #include "log.h" | 32 | #include "log.h" |
34 | 33 | ||
35 | namespace Akonadi2 | 34 | namespace Akonadi2 |
@@ -94,7 +93,7 @@ void Pipeline::null() | |||
94 | // state.step(); | 93 | // state.step(); |
95 | } | 94 | } |
96 | 95 | ||
97 | Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | 96 | KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) |
98 | { | 97 | { |
99 | Log() << "Pipeline: New Entity"; | 98 | Log() << "Pipeline: New Entity"; |
100 | 99 | ||
@@ -107,7 +106,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
107 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 106 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
108 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { | 107 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { |
109 | qWarning() << "invalid buffer, not a create entity buffer"; | 108 | qWarning() << "invalid buffer, not a create entity buffer"; |
110 | return Async::error<void>(); | 109 | return KAsync::error<void>(); |
111 | } | 110 | } |
112 | } | 111 | } |
113 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | 112 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); |
@@ -118,7 +117,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
118 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 117 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
119 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | 118 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { |
120 | qWarning() << "invalid buffer, not an entity buffer"; | 119 | qWarning() << "invalid buffer, not an entity buffer"; |
121 | return Async::error<void>(); | 120 | return KAsync::error<void>(); |
122 | } | 121 | } |
123 | } | 122 | } |
124 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | 123 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); |
@@ -139,7 +138,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
139 | storage().setMaxRevision(newRevision); | 138 | storage().setMaxRevision(newRevision); |
140 | Log() << "Pipeline: wrote entity: "<< newRevision; | 139 | Log() << "Pipeline: wrote entity: "<< newRevision; |
141 | 140 | ||
142 | return Async::start<void>([this, key, entityType](Async::Future<void> &future) { | 141 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { |
143 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { | 142 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { |
144 | future.setFinished(); | 143 | future.setFinished(); |
145 | }); | 144 | }); |
diff --git a/common/pipeline.h b/common/pipeline.h index a574d27..b695bde 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -27,7 +27,8 @@ | |||
27 | 27 | ||
28 | #include <akonadi2common_export.h> | 28 | #include <akonadi2common_export.h> |
29 | #include <storage.h> | 29 | #include <storage.h> |
30 | #include "async/src/async.h" | 30 | |
31 | #include <Async/Async> | ||
31 | 32 | ||
32 | #include "entity_generated.h" | 33 | #include "entity_generated.h" |
33 | 34 | ||
@@ -53,7 +54,7 @@ public: | |||
53 | 54 | ||
54 | void null(); | 55 | void null(); |
55 | 56 | ||
56 | Async::Job<void> newEntity(void const *command, size_t size); | 57 | KAsync::Job<void> newEntity(void const *command, size_t size); |
57 | void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); | 58 | void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); |
58 | void deletedEntity(const QString &entityType, const QByteArray &key); | 59 | void deletedEntity(const QString &entityType, const QByteArray &key); |
59 | 60 | ||
diff --git a/common/resource.cpp b/common/resource.cpp index e158a40..bd69afd 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -53,16 +53,16 @@ void Resource::processCommand(int commandId, const QByteArray &data, uint size, | |||
53 | pipeline->null(); | 53 | pipeline->null(); |
54 | } | 54 | } |
55 | 55 | ||
56 | Async::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline) | 56 | KAsync::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline) |
57 | { | 57 | { |
58 | return Async::start<void>([pipeline](Async::Future<void> &f) { | 58 | return KAsync::start<void>([pipeline](KAsync::Future<void> &f) { |
59 | pipeline->null(); | 59 | pipeline->null(); |
60 | }); | 60 | }); |
61 | } | 61 | } |
62 | 62 | ||
63 | Async::Job<void> Resource::processAllMessages() | 63 | KAsync::Job<void> Resource::processAllMessages() |
64 | { | 64 | { |
65 | return Async::null<void>(); | 65 | return KAsync::null<void>(); |
66 | } | 66 | } |
67 | 67 | ||
68 | class ResourceFactory::Private | 68 | class ResourceFactory::Private |
diff --git a/common/resource.h b/common/resource.h index 18a6827..ea1e9d8 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -21,7 +21,8 @@ | |||
21 | #include <akonadi2common_export.h> | 21 | #include <akonadi2common_export.h> |
22 | #include <clientapi.h> | 22 | #include <clientapi.h> |
23 | #include <pipeline.h> | 23 | #include <pipeline.h> |
24 | #include <async/src/async.h> | 24 | |
25 | #include <Async/Async> | ||
25 | 26 | ||
26 | namespace Akonadi2 | 27 | namespace Akonadi2 |
27 | { | 28 | { |
@@ -36,8 +37,8 @@ public: | |||
36 | virtual ~Resource(); | 37 | virtual ~Resource(); |
37 | 38 | ||
38 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | 39 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); |
39 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); | 40 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline); |
40 | virtual Async::Job<void> processAllMessages(); | 41 | virtual KAsync::Job<void> processAllMessages(); |
41 | 42 | ||
42 | virtual void configurePipeline(Pipeline *pipeline); | 43 | virtual void configurePipeline(Pipeline *pipeline); |
43 | 44 | ||
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index b7d569b..feffcf4 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -70,8 +70,8 @@ class ResourceAccess::Private | |||
70 | { | 70 | { |
71 | public: | 71 | public: |
72 | Private(const QByteArray &name, ResourceAccess *ra); | 72 | Private(const QByteArray &name, ResourceAccess *ra); |
73 | Async::Job<void> tryToConnect(); | 73 | KAsync::Job<void> tryToConnect(); |
74 | Async::Job<void> initializeSocket(); | 74 | KAsync::Job<void> initializeSocket(); |
75 | QByteArray resourceName; | 75 | QByteArray resourceName; |
76 | QSharedPointer<QLocalSocket> socket; | 76 | QSharedPointer<QLocalSocket> socket; |
77 | QByteArray partialMessageBuffer; | 77 | QByteArray partialMessageBuffer; |
@@ -89,10 +89,10 @@ ResourceAccess::Private::Private(const QByteArray &name, ResourceAccess *q) | |||
89 | } | 89 | } |
90 | 90 | ||
91 | //Connects to server and returns connected socket on success | 91 | //Connects to server and returns connected socket on success |
92 | Async::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const QByteArray &identifier) | 92 | KAsync::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const QByteArray &identifier) |
93 | { | 93 | { |
94 | auto s = QSharedPointer<QLocalSocket>::create(); | 94 | auto s = QSharedPointer<QLocalSocket>::create(); |
95 | return Async::start<QSharedPointer<QLocalSocket> >([identifier, s](Async::Future<QSharedPointer<QLocalSocket> > &future) { | 95 | return KAsync::start<QSharedPointer<QLocalSocket> >([identifier, s](KAsync::Future<QSharedPointer<QLocalSocket> > &future) { |
96 | s->setServerName(identifier); | 96 | s->setServerName(identifier); |
97 | auto context = new QObject; | 97 | auto context = new QObject; |
98 | QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() { | 98 | QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() { |
@@ -109,15 +109,15 @@ Async::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const | |||
109 | }); | 109 | }); |
110 | } | 110 | } |
111 | 111 | ||
112 | Async::Job<void> ResourceAccess::Private::tryToConnect() | 112 | KAsync::Job<void> ResourceAccess::Private::tryToConnect() |
113 | { | 113 | { |
114 | return Async::dowhile([this]() -> bool { | 114 | return KAsync::dowhile([this]() -> bool { |
115 | //TODO abort after N retries? | 115 | //TODO abort after N retries? |
116 | return !socket; | 116 | return !socket; |
117 | }, | 117 | }, |
118 | [this](Async::Future<void> &future) { | 118 | [this](KAsync::Future<void> &future) { |
119 | Trace() << "Loop"; | 119 | Trace() << "Loop"; |
120 | Async::wait(50) | 120 | KAsync::wait(50) |
121 | .then(connectToServer(resourceName)) | 121 | .then(connectToServer(resourceName)) |
122 | .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { | 122 | .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { |
123 | Q_ASSERT(s); | 123 | Q_ASSERT(s); |
@@ -130,9 +130,9 @@ Async::Job<void> ResourceAccess::Private::tryToConnect() | |||
130 | }); | 130 | }); |
131 | } | 131 | } |
132 | 132 | ||
133 | Async::Job<void> ResourceAccess::Private::initializeSocket() | 133 | KAsync::Job<void> ResourceAccess::Private::initializeSocket() |
134 | { | 134 | { |
135 | return Async::start<void>([this](Async::Future<void> &future) { | 135 | return KAsync::start<void>([this](KAsync::Future<void> &future) { |
136 | Trace() << "Trying to connect"; | 136 | Trace() << "Trying to connect"; |
137 | connectToServer(resourceName).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { | 137 | connectToServer(resourceName).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { |
138 | Trace() << "Connected to resource, without having to start it."; | 138 | Trace() << "Connected to resource, without having to start it."; |
@@ -189,9 +189,9 @@ void ResourceAccess::registerCallback(uint messageId, const std::function<void(i | |||
189 | d->resultHandler.insert(messageId, callback); | 189 | d->resultHandler.insert(messageId, callback); |
190 | } | 190 | } |
191 | 191 | ||
192 | Async::Job<void> ResourceAccess::sendCommand(int commandId) | 192 | KAsync::Job<void> ResourceAccess::sendCommand(int commandId) |
193 | { | 193 | { |
194 | return Async::start<void>([this, commandId](Async::Future<void> &f) { | 194 | return KAsync::start<void>([this, commandId](KAsync::Future<void> &f) { |
195 | auto continuation = [&f](int error, const QString &errorMessage) { | 195 | auto continuation = [&f](int error, const QString &errorMessage) { |
196 | if (error) { | 196 | if (error) { |
197 | f.setError(error, errorMessage); | 197 | f.setError(error, errorMessage); |
@@ -205,11 +205,11 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId) | |||
205 | }); | 205 | }); |
206 | } | 206 | } |
207 | 207 | ||
208 | Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | 208 | KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) |
209 | { | 209 | { |
210 | //The flatbuffer is transient, but we want to store it until the job is executed | 210 | //The flatbuffer is transient, but we want to store it until the job is executed |
211 | QByteArray buffer(reinterpret_cast<const char*>(fbb.GetBufferPointer()), fbb.GetSize()); | 211 | QByteArray buffer(reinterpret_cast<const char*>(fbb.GetBufferPointer()), fbb.GetSize()); |
212 | return Async::start<void>([commandId, buffer, this](Async::Future<void> &f) { | 212 | return KAsync::start<void>([commandId, buffer, this](KAsync::Future<void> &f) { |
213 | auto callback = [&f](int error, const QString &errorMessage) { | 213 | auto callback = [&f](int error, const QString &errorMessage) { |
214 | if (error) { | 214 | if (error) { |
215 | f.setError(error, errorMessage); | 215 | f.setError(error, errorMessage); |
@@ -225,7 +225,7 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu | |||
225 | }); | 225 | }); |
226 | } | 226 | } |
227 | 227 | ||
228 | Async::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) | 228 | KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) |
229 | { | 229 | { |
230 | auto command = Akonadi2::CreateSynchronize(d->fbb, sourceSync, localSync); | 230 | auto command = Akonadi2::CreateSynchronize(d->fbb, sourceSync, localSync); |
231 | Akonadi2::FinishSynchronizeBuffer(d->fbb, command); | 231 | Akonadi2::FinishSynchronizeBuffer(d->fbb, command); |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index c16a9d2..b779db9 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -24,8 +24,9 @@ | |||
24 | #include <QObject> | 24 | #include <QObject> |
25 | #include <QTimer> | 25 | #include <QTimer> |
26 | 26 | ||
27 | #include <Async/Async> | ||
28 | |||
27 | #include <flatbuffers/flatbuffers.h> | 29 | #include <flatbuffers/flatbuffers.h> |
28 | #include <async/src/async.h> | ||
29 | 30 | ||
30 | namespace Akonadi2 | 31 | namespace Akonadi2 |
31 | { | 32 | { |
@@ -43,13 +44,13 @@ public: | |||
43 | QByteArray resourceName() const; | 44 | QByteArray resourceName() const; |
44 | bool isReady() const; | 45 | bool isReady() const; |
45 | 46 | ||
46 | Async::Job<void> sendCommand(int commandId); | 47 | KAsync::Job<void> sendCommand(int commandId); |
47 | Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); | 48 | KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); |
48 | Async::Job<void> synchronizeResource(bool remoteSync, bool localSync); | 49 | KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync); |
49 | /** | 50 | /** |
50 | * Tries to connect to server, and returns a connected socket on success. | 51 | * Tries to connect to server, and returns a connected socket on success. |
51 | */ | 52 | */ |
52 | static Async::Job<QSharedPointer<QLocalSocket> > connectToServer(const QByteArray &identifier); | 53 | static KAsync::Job<QSharedPointer<QLocalSocket> > connectToServer(const QByteArray &identifier); |
53 | 54 | ||
54 | public Q_SLOTS: | 55 | public Q_SLOTS: |
55 | void open(); | 56 | void open(); |