diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/CMakeLists.txt | 7 | ||||
-rw-r--r-- | common/entity.fbs (renamed from common/entitybuffer.fbs) | 4 | ||||
-rw-r--r-- | common/entitybuffer.cpp | 53 | ||||
-rw-r--r-- | common/entitybuffer.h | 22 | ||||
-rw-r--r-- | common/pipeline.cpp | 29 | ||||
-rw-r--r-- | common/pipeline.h | 1 | ||||
-rw-r--r-- | common/resource.cpp | 6 | ||||
-rw-r--r-- | common/resource.h | 3 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 96 | ||||
-rw-r--r-- | common/resourceaccess.h | 8 |
10 files changed, 167 insertions, 62 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index ec13e07..1a9a812 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -8,7 +8,7 @@ generate_flatbuffers( | |||
8 | commands/modifyentity | 8 | commands/modifyentity |
9 | commands/revisionupdate | 9 | commands/revisionupdate |
10 | domain/event | 10 | domain/event |
11 | entitybuffer | 11 | entity |
12 | metadata | 12 | metadata |
13 | ) | 13 | ) |
14 | 14 | ||
@@ -17,10 +17,11 @@ if (STORAGE_unqlite) | |||
17 | set(storage_SRCS unqlite/unqlite.c storage_unqlite.cpp) | 17 | set(storage_SRCS unqlite/unqlite.c storage_unqlite.cpp) |
18 | else (STORAGE_unqlite) | 18 | else (STORAGE_unqlite) |
19 | set(storage_SRCS storage_lmdb.cpp) | 19 | set(storage_SRCS storage_lmdb.cpp) |
20 | set(storage_LIBS lmdb) | 20 | set(storage_LIBS ${lmdb}) |
21 | endif (STORAGE_unqlite) | 21 | endif (STORAGE_unqlite) |
22 | 22 | ||
23 | set(command_SRCS | 23 | set(command_SRCS |
24 | entitybuffer.cpp | ||
24 | clientapi.cpp | 25 | clientapi.cpp |
25 | commands.cpp | 26 | commands.cpp |
26 | console.cpp | 27 | console.cpp |
@@ -35,7 +36,7 @@ add_library(${PROJECT_NAME} SHARED ${command_SRCS}) | |||
35 | generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) | 36 | generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) |
36 | SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) | 37 | SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) |
37 | qt5_use_modules(${PROJECT_NAME} Widgets Network) | 38 | qt5_use_modules(${PROJECT_NAME} Widgets Network) |
38 | target_link_libraries(${PROJECT_NAME} ${storage_LIBS}) | 39 | target_link_libraries(${PROJECT_NAME} ${storage_LIBS} akonadi2async) |
39 | install(TARGETS ${PROJECT_NAME} DESTINATION lib) | 40 | install(TARGETS ${PROJECT_NAME} DESTINATION lib) |
40 | 41 | ||
41 | add_subdirectory(test) | 42 | add_subdirectory(test) |
diff --git a/common/entitybuffer.fbs b/common/entity.fbs index 28c9b2a..565b1a7 100644 --- a/common/entitybuffer.fbs +++ b/common/entity.fbs | |||
@@ -1,9 +1,9 @@ | |||
1 | namespace Akonadi2; | 1 | namespace Akonadi2; |
2 | 2 | ||
3 | table EntityBuffer { | 3 | table Entity { |
4 | metadata: [ubyte]; | 4 | metadata: [ubyte]; |
5 | resource: [ubyte]; | 5 | resource: [ubyte]; |
6 | local: [ubyte]; | 6 | local: [ubyte]; |
7 | } | 7 | } |
8 | 8 | ||
9 | root_type EntityBuffer; | 9 | root_type Entity; |
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp new file mode 100644 index 0000000..a78e91d --- /dev/null +++ b/common/entitybuffer.cpp | |||
@@ -0,0 +1,53 @@ | |||
1 | #include "entitybuffer.h" | ||
2 | |||
3 | #include "entity_generated.h" | ||
4 | #include "metadata_generated.h" | ||
5 | #include <QDebug> | ||
6 | |||
7 | using namespace Akonadi2; | ||
8 | |||
9 | EntityBuffer::EntityBuffer(void *dataValue, int dataSize) | ||
10 | : mEntity(nullptr) | ||
11 | { | ||
12 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(dataValue), dataSize); | ||
13 | // Q_ASSERT(Akonadi2::VerifyEntity(verifyer)); | ||
14 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | ||
15 | qWarning() << "invalid buffer"; | ||
16 | } else { | ||
17 | mEntity = Akonadi2::GetEntity(dataValue); | ||
18 | } | ||
19 | } | ||
20 | |||
21 | const flatbuffers::Vector<uint8_t>* EntityBuffer::resourceBuffer() | ||
22 | { | ||
23 | if (!mEntity) { | ||
24 | qDebug() << "no buffer"; | ||
25 | return nullptr; | ||
26 | } | ||
27 | return mEntity->resource(); | ||
28 | } | ||
29 | |||
30 | const flatbuffers::Vector<uint8_t>* EntityBuffer::metadataBuffer() | ||
31 | { | ||
32 | if (!mEntity) { | ||
33 | return nullptr; | ||
34 | } | ||
35 | return mEntity->metadata(); | ||
36 | } | ||
37 | |||
38 | const flatbuffers::Vector<uint8_t>* EntityBuffer::localBuffer() | ||
39 | { | ||
40 | if (!mEntity) { | ||
41 | return nullptr; | ||
42 | } | ||
43 | return mEntity->local(); | ||
44 | } | ||
45 | |||
46 | void EntityBuffer::extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const flatbuffers::Vector<uint8_t> *)> &handler) | ||
47 | { | ||
48 | Akonadi2::EntityBuffer buffer(dataValue, dataSize); | ||
49 | if (auto resourceData = buffer.resourceBuffer()) { | ||
50 | handler(resourceData); | ||
51 | } | ||
52 | } | ||
53 | |||
diff --git a/common/entitybuffer.h b/common/entitybuffer.h new file mode 100644 index 0000000..2a7150e --- /dev/null +++ b/common/entitybuffer.h | |||
@@ -0,0 +1,22 @@ | |||
1 | #pragma once | ||
2 | |||
3 | #include <functional> | ||
4 | #include <flatbuffers/flatbuffers.h> | ||
5 | |||
6 | namespace Akonadi2 { | ||
7 | class Entity; | ||
8 | |||
9 | class EntityBuffer { | ||
10 | public: | ||
11 | EntityBuffer(void *dataValue, int size); | ||
12 | const flatbuffers::Vector<uint8_t> *resourceBuffer(); | ||
13 | const flatbuffers::Vector<uint8_t> *metadataBuffer(); | ||
14 | const flatbuffers::Vector<uint8_t> *localBuffer(); | ||
15 | |||
16 | static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const flatbuffers::Vector<uint8_t> *)> &handler); | ||
17 | private: | ||
18 | const Entity *mEntity; | ||
19 | }; | ||
20 | |||
21 | } | ||
22 | |||
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 5ca8b95..dc6d389 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -24,7 +24,7 @@ | |||
24 | #include <QStandardPaths> | 24 | #include <QStandardPaths> |
25 | #include <QVector> | 25 | #include <QVector> |
26 | #include <QDebug> | 26 | #include <QDebug> |
27 | #include "entitybuffer_generated.h" | 27 | #include "entity_generated.h" |
28 | #include "metadata_generated.h" | 28 | #include "metadata_generated.h" |
29 | 29 | ||
30 | namespace Akonadi2 | 30 | namespace Akonadi2 |
@@ -76,33 +76,31 @@ void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t | |||
76 | { | 76 | { |
77 | const qint64 newRevision = storage().maxRevision() + 1; | 77 | const qint64 newRevision = storage().maxRevision() + 1; |
78 | 78 | ||
79 | flatbuffers::FlatBufferBuilder fbb; | ||
80 | auto builder = Akonadi2::EntityBufferBuilder(fbb); | ||
81 | 79 | ||
80 | std::vector<uint8_t> metadataData; | ||
82 | //Add metadata buffer | 81 | //Add metadata buffer |
83 | { | 82 | { |
84 | flatbuffers::FlatBufferBuilder metadataFbb; | 83 | flatbuffers::FlatBufferBuilder metadataFbb; |
85 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | 84 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); |
86 | metadataBuilder.add_revision(newRevision); | 85 | metadataBuilder.add_revision(newRevision); |
87 | auto metadataBuffer = metadataBuilder.Finish(); | 86 | auto metadataBuffer = metadataBuilder.Finish(); |
88 | Akonadi2::FinishMetadataBuffer(fbb, metadataBuffer); | 87 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); |
89 | //TODO use memcpy | 88 | metadataData.resize(metadataFbb.GetSize()); |
90 | auto metadata = fbb.CreateVector<uint8_t>(metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 89 | std::copy_n(metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), back_inserter(metadataData)); |
91 | builder.add_metadata(metadata); | ||
92 | } | 90 | } |
93 | 91 | ||
94 | //Add resource buffer | ||
95 | { | ||
96 | //TODO use memcpy | ||
97 | auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(resourceBufferData), size); | ||
98 | builder.add_resource(resource); | ||
99 | } | ||
100 | 92 | ||
93 | flatbuffers::FlatBufferBuilder fbb; | ||
94 | auto metadata = fbb.CreateVector<uint8_t>(metadataData.data(), metadataData.size()); | ||
95 | auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(resourceBufferData), size); | ||
96 | auto builder = Akonadi2::EntityBuilder(fbb); | ||
97 | builder.add_metadata(metadata); | ||
98 | builder.add_resource(resource); | ||
101 | //We don't have a local buffer yet | 99 | //We don't have a local buffer yet |
102 | // builder.add_local(); | 100 | // builder.add_local(); |
103 | 101 | ||
104 | auto buffer = builder.Finish(); | 102 | auto buffer = builder.Finish(); |
105 | Akonadi2::FinishEntityBufferBuffer(fbb, buffer); | 103 | Akonadi2::FinishEntityBuffer(fbb, buffer); |
106 | 104 | ||
107 | qDebug() << "writing new entity" << key; | 105 | qDebug() << "writing new entity" << key; |
108 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | 106 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); |
@@ -163,6 +161,9 @@ void Pipeline::pipelineCompleted(const PipelineState &state) | |||
163 | emit revisionUpdated(); | 161 | emit revisionUpdated(); |
164 | } | 162 | } |
165 | scheduleStep(); | 163 | scheduleStep(); |
164 | if (d->activePipelines.isEmpty()) { | ||
165 | emit pipelinesDrained(); | ||
166 | } | ||
166 | } | 167 | } |
167 | 168 | ||
168 | 169 | ||
diff --git a/common/pipeline.h b/common/pipeline.h index 159cc1c..6ef8703 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -55,6 +55,7 @@ public: | |||
55 | 55 | ||
56 | Q_SIGNALS: | 56 | Q_SIGNALS: |
57 | void revisionUpdated(); | 57 | void revisionUpdated(); |
58 | void pipelinesDrained(); | ||
58 | 59 | ||
59 | private Q_SLOTS: | 60 | private Q_SLOTS: |
60 | void stepPipelines(); | 61 | void stepPipelines(); |
diff --git a/common/resource.cpp b/common/resource.cpp index ae28485..bba6609 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -48,9 +48,11 @@ void Resource::processCommand(int commandId, const QByteArray &data, uint size, | |||
48 | pipeline->null(); | 48 | pipeline->null(); |
49 | } | 49 | } |
50 | 50 | ||
51 | void Resource::synchronizeWithSource(Pipeline *pipeline) | 51 | Async::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline) |
52 | { | 52 | { |
53 | pipeline->null(); | 53 | return Async::start<void>([pipeline](Async::Future<void> &f) { |
54 | pipeline->null(); | ||
55 | }); | ||
54 | } | 56 | } |
55 | 57 | ||
56 | class ResourceFactory::Private | 58 | class ResourceFactory::Private |
diff --git a/common/resource.h b/common/resource.h index 0f65e1f..fb42c1b 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -21,6 +21,7 @@ | |||
21 | #include <akonadi2common_export.h> | 21 | #include <akonadi2common_export.h> |
22 | #include <clientapi.h> | 22 | #include <clientapi.h> |
23 | #include <pipeline.h> | 23 | #include <pipeline.h> |
24 | #include <async/src/async.h> | ||
24 | 25 | ||
25 | namespace Akonadi2 | 26 | namespace Akonadi2 |
26 | { | 27 | { |
@@ -33,7 +34,7 @@ public: | |||
33 | virtual ~Resource(); | 34 | virtual ~Resource(); |
34 | 35 | ||
35 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | 36 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); |
36 | virtual void synchronizeWithSource(Pipeline *pipeline); | 37 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); |
37 | 38 | ||
38 | private: | 39 | private: |
39 | class Private; | 40 | class Private; |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 1706ac4..31b9e79 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -37,38 +37,36 @@ namespace Akonadi2 | |||
37 | class QueuedCommand | 37 | class QueuedCommand |
38 | { | 38 | { |
39 | public: | 39 | public: |
40 | QueuedCommand(int commandId) | 40 | QueuedCommand(int commandId, const std::function<void()> &callback) |
41 | : m_commandId(commandId), | 41 | : commandId(commandId), |
42 | m_bufferSize(0), | 42 | bufferSize(0), |
43 | m_buffer(0) | 43 | buffer(0), |
44 | callback(callback) | ||
44 | {} | 45 | {} |
45 | 46 | ||
46 | QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | 47 | QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback) |
47 | : m_commandId(commandId), | 48 | : commandId(commandId), |
48 | m_bufferSize(fbb.GetSize()), | 49 | bufferSize(fbb.GetSize()), |
49 | m_buffer(new char[m_bufferSize]) | 50 | buffer(new char[bufferSize]), |
51 | callback(callback) | ||
50 | { | 52 | { |
51 | memcpy(m_buffer, fbb.GetBufferPointer(), m_bufferSize); | 53 | memcpy(buffer, fbb.GetBufferPointer(), bufferSize); |
52 | } | 54 | } |
53 | 55 | ||
54 | ~QueuedCommand() | 56 | ~QueuedCommand() |
55 | { | 57 | { |
56 | delete[] m_buffer; | 58 | delete[] buffer; |
57 | } | ||
58 | |||
59 | void write(QIODevice *device, uint messageId) | ||
60 | { | ||
61 | // Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); | ||
62 | Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); | ||
63 | } | 59 | } |
64 | 60 | ||
65 | private: | 61 | private: |
66 | QueuedCommand(const QueuedCommand &other); | 62 | QueuedCommand(const QueuedCommand &other); |
67 | QueuedCommand &operator=(const QueuedCommand &rhs); | 63 | QueuedCommand &operator=(const QueuedCommand &rhs); |
68 | 64 | ||
69 | const int m_commandId; | 65 | public: |
70 | const uint m_bufferSize; | 66 | const int commandId; |
71 | char *m_buffer; | 67 | const uint bufferSize; |
68 | char *buffer; | ||
69 | std::function<void()> callback; | ||
72 | }; | 70 | }; |
73 | 71 | ||
74 | class ResourceAccess::Private | 72 | class ResourceAccess::Private |
@@ -82,7 +80,7 @@ public: | |||
82 | QByteArray partialMessageBuffer; | 80 | QByteArray partialMessageBuffer; |
83 | flatbuffers::FlatBufferBuilder fbb; | 81 | flatbuffers::FlatBufferBuilder fbb; |
84 | QVector<QueuedCommand *> commandQueue; | 82 | QVector<QueuedCommand *> commandQueue; |
85 | QVector<std::function<void()> > synchronizeResultHandler; | 83 | QMultiMap<uint, std::function<void()> > resultHandler; |
86 | uint messageId; | 84 | uint messageId; |
87 | }; | 85 | }; |
88 | 86 | ||
@@ -130,31 +128,42 @@ bool ResourceAccess::isReady() const | |||
130 | return d->socket->isValid(); | 128 | return d->socket->isValid(); |
131 | } | 129 | } |
132 | 130 | ||
133 | void ResourceAccess::sendCommand(int commandId) | 131 | void ResourceAccess::registerCallback(uint messageId, const std::function<void()> &callback) |
132 | { | ||
133 | d->resultHandler.insert(messageId, callback); | ||
134 | } | ||
135 | |||
136 | void ResourceAccess::sendCommand(int commandId, const std::function<void()> &callback) | ||
134 | { | 137 | { |
135 | if (isReady()) { | 138 | if (isReady()) { |
136 | log(QString("Sending command %1").arg(commandId)); | 139 | log(QString("Sending command %1").arg(commandId)); |
137 | Commands::write(d->socket, ++d->messageId, commandId); | 140 | d->messageId++; |
141 | if (callback) { | ||
142 | registerCallback(d->messageId, callback); | ||
143 | } | ||
144 | Commands::write(d->socket, d->messageId, commandId); | ||
138 | } else { | 145 | } else { |
139 | d->commandQueue << new QueuedCommand(commandId); | 146 | d->commandQueue << new QueuedCommand(commandId, callback); |
140 | } | 147 | } |
141 | } | 148 | } |
142 | 149 | ||
143 | void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | 150 | void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback) |
144 | { | 151 | { |
145 | if (isReady()) { | 152 | if (isReady()) { |
146 | log(QString("Sending command %1").arg(commandId)); | 153 | log(QString("Sending command %1").arg(commandId)); |
147 | Commands::write(d->socket, ++d->messageId, commandId, fbb); | 154 | d->messageId++; |
155 | if (callback) { | ||
156 | registerCallback(d->messageId, callback); | ||
157 | } | ||
158 | Commands::write(d->socket, d->messageId, commandId, fbb); | ||
148 | } else { | 159 | } else { |
149 | d->commandQueue << new QueuedCommand(commandId, fbb); | 160 | d->commandQueue << new QueuedCommand(commandId, fbb, callback); |
150 | } | 161 | } |
151 | } | 162 | } |
152 | 163 | ||
153 | void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler) | 164 | void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler) |
154 | { | 165 | { |
155 | sendCommand(Commands::SynchronizeCommand); | 166 | sendCommand(Commands::SynchronizeCommand, resultHandler); |
156 | //TODO: this should be implemented as a job, so we don't need to store the result handler as member | ||
157 | d->synchronizeResultHandler << resultHandler; | ||
158 | } | 167 | } |
159 | 168 | ||
160 | void ResourceAccess::open() | 169 | void ResourceAccess::open() |
@@ -200,7 +209,12 @@ void ResourceAccess::connected() | |||
200 | //TODO: serialize instead of blast them all through the socket? | 209 | //TODO: serialize instead of blast them all through the socket? |
201 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); | 210 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); |
202 | for (QueuedCommand *command: d->commandQueue) { | 211 | for (QueuedCommand *command: d->commandQueue) { |
203 | command->write(d->socket, ++d->messageId); | 212 | d->messageId++; |
213 | log(QString("Sending command %1").arg(command->commandId)); | ||
214 | if (command->callback) { | ||
215 | registerCallback(d->messageId, command->callback); | ||
216 | } | ||
217 | Commands::write(d->socket, d->messageId, command->commandId, command->buffer, command->bufferSize); | ||
204 | delete command; | 218 | delete command; |
205 | } | 219 | } |
206 | d->commandQueue.clear(); | 220 | d->commandQueue.clear(); |
@@ -234,6 +248,8 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) | |||
234 | if (!d->tryOpenTimer->isActive()) { | 248 | if (!d->tryOpenTimer->isActive()) { |
235 | d->tryOpenTimer->start(); | 249 | d->tryOpenTimer->start(); |
236 | } | 250 | } |
251 | } else { | ||
252 | qWarning() << "Failed to start resource"; | ||
237 | } | 253 | } |
238 | } | 254 | } |
239 | 255 | ||
@@ -256,8 +272,7 @@ bool ResourceAccess::processMessageBuffer() | |||
256 | return false; | 272 | return false; |
257 | } | 273 | } |
258 | 274 | ||
259 | //messageId is unused, so commented out | 275 | const uint messageId = *(int*)(d->partialMessageBuffer.constData()); |
260 | //const uint messageId = *(int*)(d->partialMessageBuffer.constData()); | ||
261 | const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); | 276 | const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); |
262 | const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); | 277 | const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); |
263 | 278 | ||
@@ -271,18 +286,15 @@ bool ResourceAccess::processMessageBuffer() | |||
271 | log(QString("Revision updated to: %1").arg(buffer->revision())); | 286 | log(QString("Revision updated to: %1").arg(buffer->revision())); |
272 | emit revisionChanged(buffer->revision()); | 287 | emit revisionChanged(buffer->revision()); |
273 | 288 | ||
274 | //FIXME: The result handler should be called on completion of the synchronize command, and not upon arbitrary revision updates. | ||
275 | for(auto handler : d->synchronizeResultHandler) { | ||
276 | //FIXME: we should associate the handler with a buffer->id() to avoid prematurely triggering the result handler from a delayed synchronized response (this is relevant for on-demand syncing). | ||
277 | handler(); | ||
278 | } | ||
279 | d->synchronizeResultHandler.clear(); | ||
280 | break; | 289 | break; |
281 | } | 290 | } |
282 | case Commands::CommandCompletion: { | 291 | case Commands::CommandCompletion: { |
283 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); | 292 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); |
284 | log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); | 293 | log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); |
285 | //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc | 294 | //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc |
295 | |||
296 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first | ||
297 | QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id())); | ||
286 | break; | 298 | break; |
287 | } | 299 | } |
288 | default: | 300 | default: |
@@ -293,6 +305,14 @@ bool ResourceAccess::processMessageBuffer() | |||
293 | return d->partialMessageBuffer.size() >= headerSize; | 305 | return d->partialMessageBuffer.size() >= headerSize; |
294 | } | 306 | } |
295 | 307 | ||
308 | void ResourceAccess::callCallbacks(int id) | ||
309 | { | ||
310 | for(auto handler : d->resultHandler.values(id)) { | ||
311 | handler(); | ||
312 | } | ||
313 | d->resultHandler.remove(id); | ||
314 | } | ||
315 | |||
296 | void ResourceAccess::log(const QString &message) | 316 | void ResourceAccess::log(const QString &message) |
297 | { | 317 | { |
298 | qDebug() << d->resourceName + ": " + message; | 318 | qDebug() << d->resourceName + ": " + message; |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 7416b25..d79c993 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -40,8 +40,9 @@ public: | |||
40 | QString resourceName() const; | 40 | QString resourceName() const; |
41 | bool isReady() const; | 41 | bool isReady() const; |
42 | 42 | ||
43 | void sendCommand(int commandId); | 43 | //TODO use jobs |
44 | void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); | 44 | void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>()); |
45 | void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback); | ||
45 | void synchronizeResource(const std::function<void()> &resultHandler); | 46 | void synchronizeResource(const std::function<void()> &resultHandler); |
46 | 47 | ||
47 | public Q_SLOTS: | 48 | public Q_SLOTS: |
@@ -51,6 +52,7 @@ public Q_SLOTS: | |||
51 | Q_SIGNALS: | 52 | Q_SIGNALS: |
52 | void ready(bool isReady); | 53 | void ready(bool isReady); |
53 | void revisionChanged(unsigned long long revision); | 54 | void revisionChanged(unsigned long long revision); |
55 | void commandCompleted(); | ||
54 | 56 | ||
55 | private Q_SLOTS: | 57 | private Q_SLOTS: |
56 | //TODO: move these to the Private class | 58 | //TODO: move these to the Private class |
@@ -59,9 +61,11 @@ private Q_SLOTS: | |||
59 | void connectionError(QLocalSocket::LocalSocketError error); | 61 | void connectionError(QLocalSocket::LocalSocketError error); |
60 | void readResourceMessage(); | 62 | void readResourceMessage(); |
61 | bool processMessageBuffer(); | 63 | bool processMessageBuffer(); |
64 | void callCallbacks(int id); | ||
62 | 65 | ||
63 | private: | 66 | private: |
64 | void log(const QString &message); | 67 | void log(const QString &message); |
68 | void registerCallback(uint messageId, const std::function<void()> &callback); | ||
65 | 69 | ||
66 | class Private; | 70 | class Private; |
67 | Private * const d; | 71 | Private * const d; |