diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/CMakeLists.txt | 2 | ||||
-rw-r--r-- | common/clientapi.cpp | 2 | ||||
-rw-r--r-- | common/clientapi.h | 12 | ||||
-rw-r--r-- | common/facade.h | 32 | ||||
-rw-r--r-- | common/genericresource.cpp | 32 | ||||
-rw-r--r-- | common/genericresource.h | 4 | ||||
-rw-r--r-- | common/pipeline.cpp | 8 | ||||
-rw-r--r-- | common/pipeline.h | 2 | ||||
-rw-r--r-- | common/resource.cpp | 8 | ||||
-rw-r--r-- | common/resource.h | 4 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 30 | ||||
-rw-r--r-- | common/resourceaccess.h | 8 |
12 files changed, 72 insertions, 72 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 19b23c8..4fb8a67 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -50,5 +50,5 @@ generate_flatbuffers( | |||
50 | generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) | 50 | generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) |
51 | SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) | 51 | SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) |
52 | qt5_use_modules(${PROJECT_NAME} Network) | 52 | qt5_use_modules(${PROJECT_NAME} Network) |
53 | target_link_libraries(${PROJECT_NAME} ${storage_LIBS} akonadi2async) | 53 | target_link_libraries(${PROJECT_NAME} ${storage_LIBS} KF5Async) |
54 | install(TARGETS ${PROJECT_NAME} ${KDE_INSTALL_TARGETS_DEFAULT_ARGS}) | 54 | install(TARGETS ${PROJECT_NAME} ${KDE_INSTALL_TARGETS_DEFAULT_ARGS}) |
diff --git a/common/clientapi.cpp b/common/clientapi.cpp index e4608c8..d287fcf 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp | |||
@@ -42,7 +42,7 @@ QByteArray getTypeName<AkonadiResource>() | |||
42 | void Store::shutdown(const QByteArray &identifier) | 42 | void Store::shutdown(const QByteArray &identifier) |
43 | { | 43 | { |
44 | Trace() << "shutdown"; | 44 | Trace() << "shutdown"; |
45 | ResourceAccess::connectToServer(identifier).then<void, QSharedPointer<QLocalSocket>>([identifier](const QSharedPointer<QLocalSocket> &socket, Async::Future<void> &future) { | 45 | ResourceAccess::connectToServer(identifier).then<void, QSharedPointer<QLocalSocket>>([identifier](const QSharedPointer<QLocalSocket> &socket, KAsync::Future<void> &future) { |
46 | //We can't currently reuse the socket | 46 | //We can't currently reuse the socket |
47 | socket->close(); | 47 | socket->close(); |
48 | auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(identifier); | 48 | auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(identifier); |
diff --git a/common/clientapi.h b/common/clientapi.h index 1bd8bdc..0ce1691 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -228,10 +228,10 @@ class StoreFacade { | |||
228 | public: | 228 | public: |
229 | virtual ~StoreFacade(){}; | 229 | virtual ~StoreFacade(){}; |
230 | QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } | 230 | QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } |
231 | virtual Async::Job<void> create(const DomainType &domainObject) = 0; | 231 | virtual KAsync::Job<void> create(const DomainType &domainObject) = 0; |
232 | virtual Async::Job<void> modify(const DomainType &domainObject) = 0; | 232 | virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0; |
233 | virtual Async::Job<void> remove(const DomainType &domainObject) = 0; | 233 | virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0; |
234 | virtual Async::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) = 0; | 234 | virtual KAsync::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) = 0; |
235 | }; | 235 | }; |
236 | 236 | ||
237 | 237 | ||
@@ -341,8 +341,8 @@ public: | |||
341 | //The result provider must be threadsafe. | 341 | //The result provider must be threadsafe. |
342 | async::run([query, resultSet](){ | 342 | async::run([query, resultSet](){ |
343 | // Query all resources and aggregate results | 343 | // Query all resources and aggregate results |
344 | Async::iterate(query.resources) | 344 | KAsync::iterate(query.resources) |
345 | .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, Async::Future<void> &future) { | 345 | .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, KAsync::Future<void> &future) { |
346 | //TODO pass resource identifier to factory | 346 | //TODO pass resource identifier to factory |
347 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); | 347 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); |
348 | if (facade) { | 348 | if (facade) { |
diff --git a/common/facade.h b/common/facade.h index 8c6578f..dcb30b6 100644 --- a/common/facade.h +++ b/common/facade.h | |||
@@ -44,13 +44,13 @@ class QueryRunner : public QObject | |||
44 | { | 44 | { |
45 | Q_OBJECT | 45 | Q_OBJECT |
46 | public: | 46 | public: |
47 | typedef std::function<Async::Job<qint64>(qint64 oldRevision, qint64 newRevision)> QueryFunction; | 47 | typedef std::function<KAsync::Job<qint64>(qint64 oldRevision, qint64 newRevision)> QueryFunction; |
48 | 48 | ||
49 | QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; | 49 | QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; |
50 | /** | 50 | /** |
51 | * Starts query | 51 | * Starts query |
52 | */ | 52 | */ |
53 | Async::Job<void> run(qint64 newRevision = 0) | 53 | KAsync::Job<void> run(qint64 newRevision = 0) |
54 | { | 54 | { |
55 | //TODO: JOBAPI: that last empty .then should not be necessary | 55 | //TODO: JOBAPI: that last empty .then should not be necessary |
56 | return queryFunction(mLatestRevision, newRevision).then<void, qint64>([this](qint64 revision) { | 56 | return queryFunction(mLatestRevision, newRevision).then<void, qint64>([this](qint64 revision) { |
@@ -120,7 +120,7 @@ public: | |||
120 | return Akonadi2::ApplicationDomain::getTypeName<DomainType>(); | 120 | return Akonadi2::ApplicationDomain::getTypeName<DomainType>(); |
121 | } | 121 | } |
122 | 122 | ||
123 | Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE | 123 | KAsync::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE |
124 | { | 124 | { |
125 | if (!mDomainTypeAdaptorFactory) { | 125 | if (!mDomainTypeAdaptorFactory) { |
126 | Warning() << "No domain type adaptor factory available"; | 126 | Warning() << "No domain type adaptor factory available"; |
@@ -130,25 +130,25 @@ public: | |||
130 | return sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); | 130 | return sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); |
131 | } | 131 | } |
132 | 132 | ||
133 | Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE | 133 | KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE |
134 | { | 134 | { |
135 | //TODO | 135 | //TODO |
136 | return Async::null<void>(); | 136 | return KAsync::null<void>(); |
137 | } | 137 | } |
138 | 138 | ||
139 | Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE | 139 | KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE |
140 | { | 140 | { |
141 | //TODO | 141 | //TODO |
142 | return Async::null<void>(); | 142 | return KAsync::null<void>(); |
143 | } | 143 | } |
144 | 144 | ||
145 | //TODO JOBAPI return job from sync continuation to execute it as subjob? | 145 | //TODO JOBAPI return job from sync continuation to execute it as subjob? |
146 | Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE | 146 | KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE |
147 | { | 147 | { |
148 | auto runner = QSharedPointer<QueryRunner>::create(query); | 148 | auto runner = QSharedPointer<QueryRunner>::create(query); |
149 | QWeakPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > weakResultProvider = resultProvider; | 149 | QWeakPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > weakResultProvider = resultProvider; |
150 | runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> { | 150 | runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision, qint64 newRevision) -> KAsync::Job<qint64> { |
151 | return Async::start<qint64>([this, weakResultProvider, query, oldRevision, newRevision](Async::Future<qint64> &future) { | 151 | return KAsync::start<qint64>([this, weakResultProvider, query, oldRevision, newRevision](KAsync::Future<qint64> &future) { |
152 | auto resultProvider = weakResultProvider.toStrongRef(); | 152 | auto resultProvider = weakResultProvider.toStrongRef(); |
153 | if (!resultProvider) { | 153 | if (!resultProvider) { |
154 | Warning() << "Tried executing query after result provider is already gone"; | 154 | Warning() << "Tried executing query after result provider is already gone"; |
@@ -175,7 +175,7 @@ public: | |||
175 | } | 175 | } |
176 | 176 | ||
177 | //We have to capture the runner to keep it alive | 177 | //We have to capture the runner to keep it alive |
178 | return synchronizeResource(query.syncOnDemand, query.processAll).template then<void>([runner](Async::Future<void> &future) { | 178 | return synchronizeResource(query.syncOnDemand, query.processAll).template then<void>([runner](KAsync::Future<void> &future) { |
179 | runner->run().then<void>([&future]() { | 179 | runner->run().then<void>([&future]() { |
180 | future.setFinished(); | 180 | future.setFinished(); |
181 | }).exec(); | 181 | }).exec(); |
@@ -183,7 +183,7 @@ public: | |||
183 | } | 183 | } |
184 | 184 | ||
185 | protected: | 185 | protected: |
186 | Async::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) | 186 | KAsync::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) |
187 | { | 187 | { |
188 | flatbuffers::FlatBufferBuilder fbb; | 188 | flatbuffers::FlatBufferBuilder fbb; |
189 | //This is the resource buffer type and not the domain type | 189 | //This is the resource buffer type and not the domain type |
@@ -195,7 +195,7 @@ protected: | |||
195 | return mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); | 195 | return mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb); |
196 | } | 196 | } |
197 | 197 | ||
198 | Async::Job<void> synchronizeResource(bool sync, bool processAll) | 198 | KAsync::Job<void> synchronizeResource(bool sync, bool processAll) |
199 | { | 199 | { |
200 | //TODO check if a sync is necessary | 200 | //TODO check if a sync is necessary |
201 | //TODO Only sync what was requested | 201 | //TODO Only sync what was requested |
@@ -203,17 +203,17 @@ protected: | |||
203 | //TODO the synchronization should normally not be necessary: We just return what is already available. | 203 | //TODO the synchronization should normally not be necessary: We just return what is already available. |
204 | 204 | ||
205 | if (sync || processAll) { | 205 | if (sync || processAll) { |
206 | return Async::start<void>([=](Async::Future<void> &future) { | 206 | return KAsync::start<void>([=](KAsync::Future<void> &future) { |
207 | mResourceAccess->open(); | 207 | mResourceAccess->open(); |
208 | mResourceAccess->synchronizeResource(sync, processAll).then<void>([&future]() { | 208 | mResourceAccess->synchronizeResource(sync, processAll).then<void>([&future]() { |
209 | future.setFinished(); | 209 | future.setFinished(); |
210 | }).exec(); | 210 | }).exec(); |
211 | }); | 211 | }); |
212 | } | 212 | } |
213 | return Async::null<void>(); | 213 | return KAsync::null<void>(); |
214 | } | 214 | } |
215 | 215 | ||
216 | virtual Async::Job<qint64> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider, qint64 oldRevision, qint64 newRevision) { return Async::null<qint64>(); }; | 216 | virtual KAsync::Job<qint64> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider, qint64 oldRevision, qint64 newRevision) { return KAsync::null<qint64>(); }; |
217 | 217 | ||
218 | protected: | 218 | protected: |
219 | //TODO use one resource access instance per application => make static | 219 | //TODO use one resource access instance per application => make static |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index ea6413b..2394b80 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -61,33 +61,33 @@ private slots: | |||
61 | }).exec(); | 61 | }).exec(); |
62 | } | 62 | } |
63 | 63 | ||
64 | Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) | 64 | KAsync::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) |
65 | { | 65 | { |
66 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | 66 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); |
67 | //Throw command into appropriate pipeline | 67 | //Throw command into appropriate pipeline |
68 | switch (queuedCommand->commandId()) { | 68 | switch (queuedCommand->commandId()) { |
69 | case Akonadi2::Commands::DeleteEntityCommand: | 69 | case Akonadi2::Commands::DeleteEntityCommand: |
70 | //mPipeline->removedEntity | 70 | //mPipeline->removedEntity |
71 | return Async::null<void>(); | 71 | return KAsync::null<void>(); |
72 | case Akonadi2::Commands::ModifyEntityCommand: | 72 | case Akonadi2::Commands::ModifyEntityCommand: |
73 | //mPipeline->modifiedEntity | 73 | //mPipeline->modifiedEntity |
74 | return Async::null<void>(); | 74 | return KAsync::null<void>(); |
75 | case Akonadi2::Commands::CreateEntityCommand: | 75 | case Akonadi2::Commands::CreateEntityCommand: |
76 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 76 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
77 | default: | 77 | default: |
78 | return Async::error<void>(-1, "Unhandled command"); | 78 | return KAsync::error<void>(-1, "Unhandled command"); |
79 | } | 79 | } |
80 | return Async::null<void>(); | 80 | return KAsync::null<void>(); |
81 | } | 81 | } |
82 | 82 | ||
83 | //Process all messages of this queue | 83 | //Process all messages of this queue |
84 | Async::Job<void> processQueue(MessageQueue *queue) | 84 | KAsync::Job<void> processQueue(MessageQueue *queue) |
85 | { | 85 | { |
86 | //TODO use something like: | 86 | //TODO use something like: |
87 | //Async::foreach("pass iterator here").each("process value here").join(); | 87 | //KAsync::foreach("pass iterator here").each("process value here").join(); |
88 | //Async::foreach("pass iterator here").parallel("process value here").join(); | 88 | //KAsync::foreach("pass iterator here").parallel("process value here").join(); |
89 | return Async::dowhile( | 89 | return KAsync::dowhile( |
90 | [this, queue](Async::Future<bool> &future) { | 90 | [this, queue](KAsync::Future<bool> &future) { |
91 | if (queue->isEmpty()) { | 91 | if (queue->isEmpty()) { |
92 | future.setValue(false); | 92 | future.setValue(false); |
93 | future.setFinished(); | 93 | future.setFinished(); |
@@ -133,13 +133,13 @@ private slots: | |||
133 | ); | 133 | ); |
134 | } | 134 | } |
135 | 135 | ||
136 | Async::Job<void> processPipeline() | 136 | KAsync::Job<void> processPipeline() |
137 | { | 137 | { |
138 | //Go through all message queues | 138 | //Go through all message queues |
139 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | 139 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); |
140 | return Async::dowhile( | 140 | return KAsync::dowhile( |
141 | [it]() { return it->hasNext(); }, | 141 | [it]() { return it->hasNext(); }, |
142 | [it, this](Async::Future<void> &future) { | 142 | [it, this](KAsync::Future<void> &future) { |
143 | auto queue = it->next(); | 143 | auto queue = it->next(); |
144 | processQueue(queue).then<void>([&future]() { | 144 | processQueue(queue).then<void>([&future]() { |
145 | Trace() << "Queue processed"; | 145 | Trace() << "Queue processed"; |
@@ -206,12 +206,12 @@ void GenericResource::processCommand(int commandId, const QByteArray &data, uint | |||
206 | enqueueCommand(mUserQueue, commandId, data); | 206 | enqueueCommand(mUserQueue, commandId, data); |
207 | } | 207 | } |
208 | 208 | ||
209 | Async::Job<void> GenericResource::processAllMessages() | 209 | KAsync::Job<void> GenericResource::processAllMessages() |
210 | { | 210 | { |
211 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | 211 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. |
212 | //TODO: report errors while processing sync? | 212 | //TODO: report errors while processing sync? |
213 | //TODO JOBAPI: A helper that waits for n events and then continues? | 213 | //TODO JOBAPI: A helper that waits for n events and then continues? |
214 | return Async::start<void>([this](Async::Future<void> &f) { | 214 | return KAsync::start<void>([this](KAsync::Future<void> &f) { |
215 | if (mSynchronizerQueue.isEmpty()) { | 215 | if (mSynchronizerQueue.isEmpty()) { |
216 | f.setFinished(); | 216 | f.setFinished(); |
217 | } else { | 217 | } else { |
@@ -219,7 +219,7 @@ Async::Job<void> GenericResource::processAllMessages() | |||
219 | f.setFinished(); | 219 | f.setFinished(); |
220 | }); | 220 | }); |
221 | } | 221 | } |
222 | }).then<void>([this](Async::Future<void> &f) { | 222 | }).then<void>([this](KAsync::Future<void> &f) { |
223 | if (mUserQueue.isEmpty()) { | 223 | if (mUserQueue.isEmpty()) { |
224 | f.setFinished(); | 224 | f.setFinished(); |
225 | } else { | 225 | } else { |
diff --git a/common/genericresource.h b/common/genericresource.h index 36fa567..ac28575 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -38,8 +38,8 @@ public: | |||
38 | virtual ~GenericResource(); | 38 | virtual ~GenericResource(); |
39 | 39 | ||
40 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; | 40 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; |
41 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; | 41 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; |
42 | virtual Async::Job<void> processAllMessages() Q_DECL_OVERRIDE; | 42 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; |
43 | 43 | ||
44 | virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE; | 44 | virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE; |
45 | int error() const; | 45 | int error() const; |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index e2f23ed..ea82720 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -94,7 +94,7 @@ void Pipeline::null() | |||
94 | // state.step(); | 94 | // state.step(); |
95 | } | 95 | } |
96 | 96 | ||
97 | Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | 97 | KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) |
98 | { | 98 | { |
99 | Log() << "Pipeline: New Entity"; | 99 | Log() << "Pipeline: New Entity"; |
100 | 100 | ||
@@ -107,7 +107,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
107 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 107 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
108 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { | 108 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { |
109 | qWarning() << "invalid buffer, not a create entity buffer"; | 109 | qWarning() << "invalid buffer, not a create entity buffer"; |
110 | return Async::error<void>(); | 110 | return KAsync::error<void>(); |
111 | } | 111 | } |
112 | } | 112 | } |
113 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | 113 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); |
@@ -118,7 +118,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
118 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 118 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
119 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | 119 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { |
120 | qWarning() << "invalid buffer, not an entity buffer"; | 120 | qWarning() << "invalid buffer, not an entity buffer"; |
121 | return Async::error<void>(); | 121 | return KAsync::error<void>(); |
122 | } | 122 | } |
123 | } | 123 | } |
124 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | 124 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); |
@@ -139,7 +139,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
139 | storage().setMaxRevision(newRevision); | 139 | storage().setMaxRevision(newRevision); |
140 | Log() << "Pipeline: wrote entity: "<< newRevision; | 140 | Log() << "Pipeline: wrote entity: "<< newRevision; |
141 | 141 | ||
142 | return Async::start<void>([this, key, entityType](Async::Future<void> &future) { | 142 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { |
143 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { | 143 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { |
144 | future.setFinished(); | 144 | future.setFinished(); |
145 | }); | 145 | }); |
diff --git a/common/pipeline.h b/common/pipeline.h index a574d27..d25fc56 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -53,7 +53,7 @@ public: | |||
53 | 53 | ||
54 | void null(); | 54 | void null(); |
55 | 55 | ||
56 | Async::Job<void> newEntity(void const *command, size_t size); | 56 | KAsync::Job<void> newEntity(void const *command, size_t size); |
57 | void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); | 57 | void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); |
58 | void deletedEntity(const QString &entityType, const QByteArray &key); | 58 | void deletedEntity(const QString &entityType, const QByteArray &key); |
59 | 59 | ||
diff --git a/common/resource.cpp b/common/resource.cpp index e158a40..bd69afd 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -53,16 +53,16 @@ void Resource::processCommand(int commandId, const QByteArray &data, uint size, | |||
53 | pipeline->null(); | 53 | pipeline->null(); |
54 | } | 54 | } |
55 | 55 | ||
56 | Async::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline) | 56 | KAsync::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline) |
57 | { | 57 | { |
58 | return Async::start<void>([pipeline](Async::Future<void> &f) { | 58 | return KAsync::start<void>([pipeline](KAsync::Future<void> &f) { |
59 | pipeline->null(); | 59 | pipeline->null(); |
60 | }); | 60 | }); |
61 | } | 61 | } |
62 | 62 | ||
63 | Async::Job<void> Resource::processAllMessages() | 63 | KAsync::Job<void> Resource::processAllMessages() |
64 | { | 64 | { |
65 | return Async::null<void>(); | 65 | return KAsync::null<void>(); |
66 | } | 66 | } |
67 | 67 | ||
68 | class ResourceFactory::Private | 68 | class ResourceFactory::Private |
diff --git a/common/resource.h b/common/resource.h index 18a6827..170e080 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -36,8 +36,8 @@ public: | |||
36 | virtual ~Resource(); | 36 | virtual ~Resource(); |
37 | 37 | ||
38 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | 38 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); |
39 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); | 39 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline); |
40 | virtual Async::Job<void> processAllMessages(); | 40 | virtual KAsync::Job<void> processAllMessages(); |
41 | 41 | ||
42 | virtual void configurePipeline(Pipeline *pipeline); | 42 | virtual void configurePipeline(Pipeline *pipeline); |
43 | 43 | ||
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index b7d569b..feffcf4 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -70,8 +70,8 @@ class ResourceAccess::Private | |||
70 | { | 70 | { |
71 | public: | 71 | public: |
72 | Private(const QByteArray &name, ResourceAccess *ra); | 72 | Private(const QByteArray &name, ResourceAccess *ra); |
73 | Async::Job<void> tryToConnect(); | 73 | KAsync::Job<void> tryToConnect(); |
74 | Async::Job<void> initializeSocket(); | 74 | KAsync::Job<void> initializeSocket(); |
75 | QByteArray resourceName; | 75 | QByteArray resourceName; |
76 | QSharedPointer<QLocalSocket> socket; | 76 | QSharedPointer<QLocalSocket> socket; |
77 | QByteArray partialMessageBuffer; | 77 | QByteArray partialMessageBuffer; |
@@ -89,10 +89,10 @@ ResourceAccess::Private::Private(const QByteArray &name, ResourceAccess *q) | |||
89 | } | 89 | } |
90 | 90 | ||
91 | //Connects to server and returns connected socket on success | 91 | //Connects to server and returns connected socket on success |
92 | Async::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const QByteArray &identifier) | 92 | KAsync::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const QByteArray &identifier) |
93 | { | 93 | { |
94 | auto s = QSharedPointer<QLocalSocket>::create(); | 94 | auto s = QSharedPointer<QLocalSocket>::create(); |
95 | return Async::start<QSharedPointer<QLocalSocket> >([identifier, s](Async::Future<QSharedPointer<QLocalSocket> > &future) { | 95 | return KAsync::start<QSharedPointer<QLocalSocket> >([identifier, s](KAsync::Future<QSharedPointer<QLocalSocket> > &future) { |
96 | s->setServerName(identifier); | 96 | s->setServerName(identifier); |
97 | auto context = new QObject; | 97 | auto context = new QObject; |
98 | QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() { | 98 | QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() { |
@@ -109,15 +109,15 @@ Async::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const | |||
109 | }); | 109 | }); |
110 | } | 110 | } |
111 | 111 | ||
112 | Async::Job<void> ResourceAccess::Private::tryToConnect() | 112 | KAsync::Job<void> ResourceAccess::Private::tryToConnect() |
113 | { | 113 | { |
114 | return Async::dowhile([this]() -> bool { | 114 | return KAsync::dowhile([this]() -> bool { |
115 | //TODO abort after N retries? | 115 | //TODO abort after N retries? |
116 | return !socket; | 116 | return !socket; |
117 | }, | 117 | }, |
118 | [this](Async::Future<void> &future) { | 118 | [this](KAsync::Future<void> &future) { |
119 | Trace() << "Loop"; | 119 | Trace() << "Loop"; |
120 | Async::wait(50) | 120 | KAsync::wait(50) |
121 | .then(connectToServer(resourceName)) | 121 | .then(connectToServer(resourceName)) |
122 | .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { | 122 | .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { |
123 | Q_ASSERT(s); | 123 | Q_ASSERT(s); |
@@ -130,9 +130,9 @@ Async::Job<void> ResourceAccess::Private::tryToConnect() | |||
130 | }); | 130 | }); |
131 | } | 131 | } |
132 | 132 | ||
133 | Async::Job<void> ResourceAccess::Private::initializeSocket() | 133 | KAsync::Job<void> ResourceAccess::Private::initializeSocket() |
134 | { | 134 | { |
135 | return Async::start<void>([this](Async::Future<void> &future) { | 135 | return KAsync::start<void>([this](KAsync::Future<void> &future) { |
136 | Trace() << "Trying to connect"; | 136 | Trace() << "Trying to connect"; |
137 | connectToServer(resourceName).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { | 137 | connectToServer(resourceName).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { |
138 | Trace() << "Connected to resource, without having to start it."; | 138 | Trace() << "Connected to resource, without having to start it."; |
@@ -189,9 +189,9 @@ void ResourceAccess::registerCallback(uint messageId, const std::function<void(i | |||
189 | d->resultHandler.insert(messageId, callback); | 189 | d->resultHandler.insert(messageId, callback); |
190 | } | 190 | } |
191 | 191 | ||
192 | Async::Job<void> ResourceAccess::sendCommand(int commandId) | 192 | KAsync::Job<void> ResourceAccess::sendCommand(int commandId) |
193 | { | 193 | { |
194 | return Async::start<void>([this, commandId](Async::Future<void> &f) { | 194 | return KAsync::start<void>([this, commandId](KAsync::Future<void> &f) { |
195 | auto continuation = [&f](int error, const QString &errorMessage) { | 195 | auto continuation = [&f](int error, const QString &errorMessage) { |
196 | if (error) { | 196 | if (error) { |
197 | f.setError(error, errorMessage); | 197 | f.setError(error, errorMessage); |
@@ -205,11 +205,11 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId) | |||
205 | }); | 205 | }); |
206 | } | 206 | } |
207 | 207 | ||
208 | Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | 208 | KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) |
209 | { | 209 | { |
210 | //The flatbuffer is transient, but we want to store it until the job is executed | 210 | //The flatbuffer is transient, but we want to store it until the job is executed |
211 | QByteArray buffer(reinterpret_cast<const char*>(fbb.GetBufferPointer()), fbb.GetSize()); | 211 | QByteArray buffer(reinterpret_cast<const char*>(fbb.GetBufferPointer()), fbb.GetSize()); |
212 | return Async::start<void>([commandId, buffer, this](Async::Future<void> &f) { | 212 | return KAsync::start<void>([commandId, buffer, this](KAsync::Future<void> &f) { |
213 | auto callback = [&f](int error, const QString &errorMessage) { | 213 | auto callback = [&f](int error, const QString &errorMessage) { |
214 | if (error) { | 214 | if (error) { |
215 | f.setError(error, errorMessage); | 215 | f.setError(error, errorMessage); |
@@ -225,7 +225,7 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu | |||
225 | }); | 225 | }); |
226 | } | 226 | } |
227 | 227 | ||
228 | Async::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) | 228 | KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) |
229 | { | 229 | { |
230 | auto command = Akonadi2::CreateSynchronize(d->fbb, sourceSync, localSync); | 230 | auto command = Akonadi2::CreateSynchronize(d->fbb, sourceSync, localSync); |
231 | Akonadi2::FinishSynchronizeBuffer(d->fbb, command); | 231 | Akonadi2::FinishSynchronizeBuffer(d->fbb, command); |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index c16a9d2..648b12e 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -43,13 +43,13 @@ public: | |||
43 | QByteArray resourceName() const; | 43 | QByteArray resourceName() const; |
44 | bool isReady() const; | 44 | bool isReady() const; |
45 | 45 | ||
46 | Async::Job<void> sendCommand(int commandId); | 46 | KAsync::Job<void> sendCommand(int commandId); |
47 | Async::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); | 47 | KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); |
48 | Async::Job<void> synchronizeResource(bool remoteSync, bool localSync); | 48 | KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync); |
49 | /** | 49 | /** |
50 | * Tries to connect to server, and returns a connected socket on success. | 50 | * Tries to connect to server, and returns a connected socket on success. |
51 | */ | 51 | */ |
52 | static Async::Job<QSharedPointer<QLocalSocket> > connectToServer(const QByteArray &identifier); | 52 | static KAsync::Job<QSharedPointer<QLocalSocket> > connectToServer(const QByteArray &identifier); |
53 | 53 | ||
54 | public Q_SLOTS: | 54 | public Q_SLOTS: |
55 | void open(); | 55 | void open(); |