diff options
-rw-r--r-- | async/src/CMakeLists.txt | 7 | ||||
-rw-r--r-- | common/CMakeLists.txt | 7 | ||||
-rw-r--r-- | common/entity.fbs (renamed from common/entitybuffer.fbs) | 4 | ||||
-rw-r--r-- | common/entitybuffer.cpp | 53 | ||||
-rw-r--r-- | common/entitybuffer.h | 22 | ||||
-rw-r--r-- | common/pipeline.cpp | 29 | ||||
-rw-r--r-- | common/pipeline.h | 1 | ||||
-rw-r--r-- | common/resource.cpp | 6 | ||||
-rw-r--r-- | common/resource.h | 3 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 96 | ||||
-rw-r--r-- | common/resourceaccess.h | 8 | ||||
-rw-r--r-- | dummyresource/facade.cpp | 48 | ||||
-rw-r--r-- | dummyresource/resourcefactory.cpp | 94 | ||||
-rw-r--r-- | dummyresource/resourcefactory.h | 3 | ||||
-rw-r--r-- | synchronizer/listener.cpp | 155 | ||||
-rw-r--r-- | synchronizer/listener.h | 6 | ||||
-rw-r--r-- | tests/dummyresourcefacadetest.cpp | 15 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 15 |
18 files changed, 371 insertions, 201 deletions
diff --git a/async/src/CMakeLists.txt b/async/src/CMakeLists.txt index a371da0..85700ed 100644 --- a/async/src/CMakeLists.txt +++ b/async/src/CMakeLists.txt | |||
@@ -1,3 +1,5 @@ | |||
1 | project(akonadi2async) | ||
2 | |||
1 | include_directories(${CMAKE_CURRENT_BINARY_DIR}) | 3 | include_directories(${CMAKE_CURRENT_BINARY_DIR}) |
2 | 4 | ||
3 | set(async_SRCS | 5 | set(async_SRCS |
@@ -5,5 +7,6 @@ set(async_SRCS | |||
5 | future.cpp | 7 | future.cpp |
6 | ) | 8 | ) |
7 | 9 | ||
8 | add_library(akonadi2async SHARED ${async_SRCS}) | 10 | add_library(${PROJECT_NAME} SHARED ${async_SRCS}) |
9 | target_link_libraries(akonadi2async Qt5::Core) | 11 | target_link_libraries(${PROJECT_NAME} Qt5::Core) |
12 | install(TARGETS ${PROJECT_NAME} DESTINATION lib) | ||
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index ec13e07..1a9a812 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -8,7 +8,7 @@ generate_flatbuffers( | |||
8 | commands/modifyentity | 8 | commands/modifyentity |
9 | commands/revisionupdate | 9 | commands/revisionupdate |
10 | domain/event | 10 | domain/event |
11 | entitybuffer | 11 | entity |
12 | metadata | 12 | metadata |
13 | ) | 13 | ) |
14 | 14 | ||
@@ -17,10 +17,11 @@ if (STORAGE_unqlite) | |||
17 | set(storage_SRCS unqlite/unqlite.c storage_unqlite.cpp) | 17 | set(storage_SRCS unqlite/unqlite.c storage_unqlite.cpp) |
18 | else (STORAGE_unqlite) | 18 | else (STORAGE_unqlite) |
19 | set(storage_SRCS storage_lmdb.cpp) | 19 | set(storage_SRCS storage_lmdb.cpp) |
20 | set(storage_LIBS lmdb) | 20 | set(storage_LIBS ${lmdb}) |
21 | endif (STORAGE_unqlite) | 21 | endif (STORAGE_unqlite) |
22 | 22 | ||
23 | set(command_SRCS | 23 | set(command_SRCS |
24 | entitybuffer.cpp | ||
24 | clientapi.cpp | 25 | clientapi.cpp |
25 | commands.cpp | 26 | commands.cpp |
26 | console.cpp | 27 | console.cpp |
@@ -35,7 +36,7 @@ add_library(${PROJECT_NAME} SHARED ${command_SRCS}) | |||
35 | generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) | 36 | generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) |
36 | SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) | 37 | SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) |
37 | qt5_use_modules(${PROJECT_NAME} Widgets Network) | 38 | qt5_use_modules(${PROJECT_NAME} Widgets Network) |
38 | target_link_libraries(${PROJECT_NAME} ${storage_LIBS}) | 39 | target_link_libraries(${PROJECT_NAME} ${storage_LIBS} akonadi2async) |
39 | install(TARGETS ${PROJECT_NAME} DESTINATION lib) | 40 | install(TARGETS ${PROJECT_NAME} DESTINATION lib) |
40 | 41 | ||
41 | add_subdirectory(test) | 42 | add_subdirectory(test) |
diff --git a/common/entitybuffer.fbs b/common/entity.fbs index 28c9b2a..565b1a7 100644 --- a/common/entitybuffer.fbs +++ b/common/entity.fbs | |||
@@ -1,9 +1,9 @@ | |||
1 | namespace Akonadi2; | 1 | namespace Akonadi2; |
2 | 2 | ||
3 | table EntityBuffer { | 3 | table Entity { |
4 | metadata: [ubyte]; | 4 | metadata: [ubyte]; |
5 | resource: [ubyte]; | 5 | resource: [ubyte]; |
6 | local: [ubyte]; | 6 | local: [ubyte]; |
7 | } | 7 | } |
8 | 8 | ||
9 | root_type EntityBuffer; | 9 | root_type Entity; |
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp new file mode 100644 index 0000000..a78e91d --- /dev/null +++ b/common/entitybuffer.cpp | |||
@@ -0,0 +1,53 @@ | |||
1 | #include "entitybuffer.h" | ||
2 | |||
3 | #include "entity_generated.h" | ||
4 | #include "metadata_generated.h" | ||
5 | #include <QDebug> | ||
6 | |||
7 | using namespace Akonadi2; | ||
8 | |||
9 | EntityBuffer::EntityBuffer(void *dataValue, int dataSize) | ||
10 | : mEntity(nullptr) | ||
11 | { | ||
12 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(dataValue), dataSize); | ||
13 | // Q_ASSERT(Akonadi2::VerifyEntity(verifyer)); | ||
14 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | ||
15 | qWarning() << "invalid buffer"; | ||
16 | } else { | ||
17 | mEntity = Akonadi2::GetEntity(dataValue); | ||
18 | } | ||
19 | } | ||
20 | |||
21 | const flatbuffers::Vector<uint8_t>* EntityBuffer::resourceBuffer() | ||
22 | { | ||
23 | if (!mEntity) { | ||
24 | qDebug() << "no buffer"; | ||
25 | return nullptr; | ||
26 | } | ||
27 | return mEntity->resource(); | ||
28 | } | ||
29 | |||
30 | const flatbuffers::Vector<uint8_t>* EntityBuffer::metadataBuffer() | ||
31 | { | ||
32 | if (!mEntity) { | ||
33 | return nullptr; | ||
34 | } | ||
35 | return mEntity->metadata(); | ||
36 | } | ||
37 | |||
38 | const flatbuffers::Vector<uint8_t>* EntityBuffer::localBuffer() | ||
39 | { | ||
40 | if (!mEntity) { | ||
41 | return nullptr; | ||
42 | } | ||
43 | return mEntity->local(); | ||
44 | } | ||
45 | |||
46 | void EntityBuffer::extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const flatbuffers::Vector<uint8_t> *)> &handler) | ||
47 | { | ||
48 | Akonadi2::EntityBuffer buffer(dataValue, dataSize); | ||
49 | if (auto resourceData = buffer.resourceBuffer()) { | ||
50 | handler(resourceData); | ||
51 | } | ||
52 | } | ||
53 | |||
diff --git a/common/entitybuffer.h b/common/entitybuffer.h new file mode 100644 index 0000000..2a7150e --- /dev/null +++ b/common/entitybuffer.h | |||
@@ -0,0 +1,22 @@ | |||
1 | #pragma once | ||
2 | |||
3 | #include <functional> | ||
4 | #include <flatbuffers/flatbuffers.h> | ||
5 | |||
6 | namespace Akonadi2 { | ||
7 | class Entity; | ||
8 | |||
9 | class EntityBuffer { | ||
10 | public: | ||
11 | EntityBuffer(void *dataValue, int size); | ||
12 | const flatbuffers::Vector<uint8_t> *resourceBuffer(); | ||
13 | const flatbuffers::Vector<uint8_t> *metadataBuffer(); | ||
14 | const flatbuffers::Vector<uint8_t> *localBuffer(); | ||
15 | |||
16 | static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const flatbuffers::Vector<uint8_t> *)> &handler); | ||
17 | private: | ||
18 | const Entity *mEntity; | ||
19 | }; | ||
20 | |||
21 | } | ||
22 | |||
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 5ca8b95..dc6d389 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -24,7 +24,7 @@ | |||
24 | #include <QStandardPaths> | 24 | #include <QStandardPaths> |
25 | #include <QVector> | 25 | #include <QVector> |
26 | #include <QDebug> | 26 | #include <QDebug> |
27 | #include "entitybuffer_generated.h" | 27 | #include "entity_generated.h" |
28 | #include "metadata_generated.h" | 28 | #include "metadata_generated.h" |
29 | 29 | ||
30 | namespace Akonadi2 | 30 | namespace Akonadi2 |
@@ -76,33 +76,31 @@ void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t | |||
76 | { | 76 | { |
77 | const qint64 newRevision = storage().maxRevision() + 1; | 77 | const qint64 newRevision = storage().maxRevision() + 1; |
78 | 78 | ||
79 | flatbuffers::FlatBufferBuilder fbb; | ||
80 | auto builder = Akonadi2::EntityBufferBuilder(fbb); | ||
81 | 79 | ||
80 | std::vector<uint8_t> metadataData; | ||
82 | //Add metadata buffer | 81 | //Add metadata buffer |
83 | { | 82 | { |
84 | flatbuffers::FlatBufferBuilder metadataFbb; | 83 | flatbuffers::FlatBufferBuilder metadataFbb; |
85 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | 84 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); |
86 | metadataBuilder.add_revision(newRevision); | 85 | metadataBuilder.add_revision(newRevision); |
87 | auto metadataBuffer = metadataBuilder.Finish(); | 86 | auto metadataBuffer = metadataBuilder.Finish(); |
88 | Akonadi2::FinishMetadataBuffer(fbb, metadataBuffer); | 87 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); |
89 | //TODO use memcpy | 88 | metadataData.resize(metadataFbb.GetSize()); |
90 | auto metadata = fbb.CreateVector<uint8_t>(metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 89 | std::copy_n(metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), back_inserter(metadataData)); |
91 | builder.add_metadata(metadata); | ||
92 | } | 90 | } |
93 | 91 | ||
94 | //Add resource buffer | ||
95 | { | ||
96 | //TODO use memcpy | ||
97 | auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(resourceBufferData), size); | ||
98 | builder.add_resource(resource); | ||
99 | } | ||
100 | 92 | ||
93 | flatbuffers::FlatBufferBuilder fbb; | ||
94 | auto metadata = fbb.CreateVector<uint8_t>(metadataData.data(), metadataData.size()); | ||
95 | auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(resourceBufferData), size); | ||
96 | auto builder = Akonadi2::EntityBuilder(fbb); | ||
97 | builder.add_metadata(metadata); | ||
98 | builder.add_resource(resource); | ||
101 | //We don't have a local buffer yet | 99 | //We don't have a local buffer yet |
102 | // builder.add_local(); | 100 | // builder.add_local(); |
103 | 101 | ||
104 | auto buffer = builder.Finish(); | 102 | auto buffer = builder.Finish(); |
105 | Akonadi2::FinishEntityBufferBuffer(fbb, buffer); | 103 | Akonadi2::FinishEntityBuffer(fbb, buffer); |
106 | 104 | ||
107 | qDebug() << "writing new entity" << key; | 105 | qDebug() << "writing new entity" << key; |
108 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | 106 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); |
@@ -163,6 +161,9 @@ void Pipeline::pipelineCompleted(const PipelineState &state) | |||
163 | emit revisionUpdated(); | 161 | emit revisionUpdated(); |
164 | } | 162 | } |
165 | scheduleStep(); | 163 | scheduleStep(); |
164 | if (d->activePipelines.isEmpty()) { | ||
165 | emit pipelinesDrained(); | ||
166 | } | ||
166 | } | 167 | } |
167 | 168 | ||
168 | 169 | ||
diff --git a/common/pipeline.h b/common/pipeline.h index 159cc1c..6ef8703 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -55,6 +55,7 @@ public: | |||
55 | 55 | ||
56 | Q_SIGNALS: | 56 | Q_SIGNALS: |
57 | void revisionUpdated(); | 57 | void revisionUpdated(); |
58 | void pipelinesDrained(); | ||
58 | 59 | ||
59 | private Q_SLOTS: | 60 | private Q_SLOTS: |
60 | void stepPipelines(); | 61 | void stepPipelines(); |
diff --git a/common/resource.cpp b/common/resource.cpp index ae28485..bba6609 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -48,9 +48,11 @@ void Resource::processCommand(int commandId, const QByteArray &data, uint size, | |||
48 | pipeline->null(); | 48 | pipeline->null(); |
49 | } | 49 | } |
50 | 50 | ||
51 | void Resource::synchronizeWithSource(Pipeline *pipeline) | 51 | Async::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline) |
52 | { | 52 | { |
53 | pipeline->null(); | 53 | return Async::start<void>([pipeline](Async::Future<void> &f) { |
54 | pipeline->null(); | ||
55 | }); | ||
54 | } | 56 | } |
55 | 57 | ||
56 | class ResourceFactory::Private | 58 | class ResourceFactory::Private |
diff --git a/common/resource.h b/common/resource.h index 0f65e1f..fb42c1b 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -21,6 +21,7 @@ | |||
21 | #include <akonadi2common_export.h> | 21 | #include <akonadi2common_export.h> |
22 | #include <clientapi.h> | 22 | #include <clientapi.h> |
23 | #include <pipeline.h> | 23 | #include <pipeline.h> |
24 | #include <async/src/async.h> | ||
24 | 25 | ||
25 | namespace Akonadi2 | 26 | namespace Akonadi2 |
26 | { | 27 | { |
@@ -33,7 +34,7 @@ public: | |||
33 | virtual ~Resource(); | 34 | virtual ~Resource(); |
34 | 35 | ||
35 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | 36 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); |
36 | virtual void synchronizeWithSource(Pipeline *pipeline); | 37 | virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline); |
37 | 38 | ||
38 | private: | 39 | private: |
39 | class Private; | 40 | class Private; |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 1706ac4..31b9e79 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -37,38 +37,36 @@ namespace Akonadi2 | |||
37 | class QueuedCommand | 37 | class QueuedCommand |
38 | { | 38 | { |
39 | public: | 39 | public: |
40 | QueuedCommand(int commandId) | 40 | QueuedCommand(int commandId, const std::function<void()> &callback) |
41 | : m_commandId(commandId), | 41 | : commandId(commandId), |
42 | m_bufferSize(0), | 42 | bufferSize(0), |
43 | m_buffer(0) | 43 | buffer(0), |
44 | callback(callback) | ||
44 | {} | 45 | {} |
45 | 46 | ||
46 | QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | 47 | QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback) |
47 | : m_commandId(commandId), | 48 | : commandId(commandId), |
48 | m_bufferSize(fbb.GetSize()), | 49 | bufferSize(fbb.GetSize()), |
49 | m_buffer(new char[m_bufferSize]) | 50 | buffer(new char[bufferSize]), |
51 | callback(callback) | ||
50 | { | 52 | { |
51 | memcpy(m_buffer, fbb.GetBufferPointer(), m_bufferSize); | 53 | memcpy(buffer, fbb.GetBufferPointer(), bufferSize); |
52 | } | 54 | } |
53 | 55 | ||
54 | ~QueuedCommand() | 56 | ~QueuedCommand() |
55 | { | 57 | { |
56 | delete[] m_buffer; | 58 | delete[] buffer; |
57 | } | ||
58 | |||
59 | void write(QIODevice *device, uint messageId) | ||
60 | { | ||
61 | // Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); | ||
62 | Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); | ||
63 | } | 59 | } |
64 | 60 | ||
65 | private: | 61 | private: |
66 | QueuedCommand(const QueuedCommand &other); | 62 | QueuedCommand(const QueuedCommand &other); |
67 | QueuedCommand &operator=(const QueuedCommand &rhs); | 63 | QueuedCommand &operator=(const QueuedCommand &rhs); |
68 | 64 | ||
69 | const int m_commandId; | 65 | public: |
70 | const uint m_bufferSize; | 66 | const int commandId; |
71 | char *m_buffer; | 67 | const uint bufferSize; |
68 | char *buffer; | ||
69 | std::function<void()> callback; | ||
72 | }; | 70 | }; |
73 | 71 | ||
74 | class ResourceAccess::Private | 72 | class ResourceAccess::Private |
@@ -82,7 +80,7 @@ public: | |||
82 | QByteArray partialMessageBuffer; | 80 | QByteArray partialMessageBuffer; |
83 | flatbuffers::FlatBufferBuilder fbb; | 81 | flatbuffers::FlatBufferBuilder fbb; |
84 | QVector<QueuedCommand *> commandQueue; | 82 | QVector<QueuedCommand *> commandQueue; |
85 | QVector<std::function<void()> > synchronizeResultHandler; | 83 | QMultiMap<uint, std::function<void()> > resultHandler; |
86 | uint messageId; | 84 | uint messageId; |
87 | }; | 85 | }; |
88 | 86 | ||
@@ -130,31 +128,42 @@ bool ResourceAccess::isReady() const | |||
130 | return d->socket->isValid(); | 128 | return d->socket->isValid(); |
131 | } | 129 | } |
132 | 130 | ||
133 | void ResourceAccess::sendCommand(int commandId) | 131 | void ResourceAccess::registerCallback(uint messageId, const std::function<void()> &callback) |
132 | { | ||
133 | d->resultHandler.insert(messageId, callback); | ||
134 | } | ||
135 | |||
136 | void ResourceAccess::sendCommand(int commandId, const std::function<void()> &callback) | ||
134 | { | 137 | { |
135 | if (isReady()) { | 138 | if (isReady()) { |
136 | log(QString("Sending command %1").arg(commandId)); | 139 | log(QString("Sending command %1").arg(commandId)); |
137 | Commands::write(d->socket, ++d->messageId, commandId); | 140 | d->messageId++; |
141 | if (callback) { | ||
142 | registerCallback(d->messageId, callback); | ||
143 | } | ||
144 | Commands::write(d->socket, d->messageId, commandId); | ||
138 | } else { | 145 | } else { |
139 | d->commandQueue << new QueuedCommand(commandId); | 146 | d->commandQueue << new QueuedCommand(commandId, callback); |
140 | } | 147 | } |
141 | } | 148 | } |
142 | 149 | ||
143 | void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | 150 | void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback) |
144 | { | 151 | { |
145 | if (isReady()) { | 152 | if (isReady()) { |
146 | log(QString("Sending command %1").arg(commandId)); | 153 | log(QString("Sending command %1").arg(commandId)); |
147 | Commands::write(d->socket, ++d->messageId, commandId, fbb); | 154 | d->messageId++; |
155 | if (callback) { | ||
156 | registerCallback(d->messageId, callback); | ||
157 | } | ||
158 | Commands::write(d->socket, d->messageId, commandId, fbb); | ||
148 | } else { | 159 | } else { |
149 | d->commandQueue << new QueuedCommand(commandId, fbb); | 160 | d->commandQueue << new QueuedCommand(commandId, fbb, callback); |
150 | } | 161 | } |
151 | } | 162 | } |
152 | 163 | ||
153 | void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler) | 164 | void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler) |
154 | { | 165 | { |
155 | sendCommand(Commands::SynchronizeCommand); | 166 | sendCommand(Commands::SynchronizeCommand, resultHandler); |
156 | //TODO: this should be implemented as a job, so we don't need to store the result handler as member | ||
157 | d->synchronizeResultHandler << resultHandler; | ||
158 | } | 167 | } |
159 | 168 | ||
160 | void ResourceAccess::open() | 169 | void ResourceAccess::open() |
@@ -200,7 +209,12 @@ void ResourceAccess::connected() | |||
200 | //TODO: serialize instead of blast them all through the socket? | 209 | //TODO: serialize instead of blast them all through the socket? |
201 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); | 210 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); |
202 | for (QueuedCommand *command: d->commandQueue) { | 211 | for (QueuedCommand *command: d->commandQueue) { |
203 | command->write(d->socket, ++d->messageId); | 212 | d->messageId++; |
213 | log(QString("Sending command %1").arg(command->commandId)); | ||
214 | if (command->callback) { | ||
215 | registerCallback(d->messageId, command->callback); | ||
216 | } | ||
217 | Commands::write(d->socket, d->messageId, command->commandId, command->buffer, command->bufferSize); | ||
204 | delete command; | 218 | delete command; |
205 | } | 219 | } |
206 | d->commandQueue.clear(); | 220 | d->commandQueue.clear(); |
@@ -234,6 +248,8 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) | |||
234 | if (!d->tryOpenTimer->isActive()) { | 248 | if (!d->tryOpenTimer->isActive()) { |
235 | d->tryOpenTimer->start(); | 249 | d->tryOpenTimer->start(); |
236 | } | 250 | } |
251 | } else { | ||
252 | qWarning() << "Failed to start resource"; | ||
237 | } | 253 | } |
238 | } | 254 | } |
239 | 255 | ||
@@ -256,8 +272,7 @@ bool ResourceAccess::processMessageBuffer() | |||
256 | return false; | 272 | return false; |
257 | } | 273 | } |
258 | 274 | ||
259 | //messageId is unused, so commented out | 275 | const uint messageId = *(int*)(d->partialMessageBuffer.constData()); |
260 | //const uint messageId = *(int*)(d->partialMessageBuffer.constData()); | ||
261 | const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); | 276 | const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); |
262 | const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); | 277 | const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); |
263 | 278 | ||
@@ -271,18 +286,15 @@ bool ResourceAccess::processMessageBuffer() | |||
271 | log(QString("Revision updated to: %1").arg(buffer->revision())); | 286 | log(QString("Revision updated to: %1").arg(buffer->revision())); |
272 | emit revisionChanged(buffer->revision()); | 287 | emit revisionChanged(buffer->revision()); |
273 | 288 | ||
274 | //FIXME: The result handler should be called on completion of the synchronize command, and not upon arbitrary revision updates. | ||
275 | for(auto handler : d->synchronizeResultHandler) { | ||
276 | //FIXME: we should associate the handler with a buffer->id() to avoid prematurely triggering the result handler from a delayed synchronized response (this is relevant for on-demand syncing). | ||
277 | handler(); | ||
278 | } | ||
279 | d->synchronizeResultHandler.clear(); | ||
280 | break; | 289 | break; |
281 | } | 290 | } |
282 | case Commands::CommandCompletion: { | 291 | case Commands::CommandCompletion: { |
283 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); | 292 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); |
284 | log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); | 293 | log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); |
285 | //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc | 294 | //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc |
295 | |||
296 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first | ||
297 | QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id())); | ||
286 | break; | 298 | break; |
287 | } | 299 | } |
288 | default: | 300 | default: |
@@ -293,6 +305,14 @@ bool ResourceAccess::processMessageBuffer() | |||
293 | return d->partialMessageBuffer.size() >= headerSize; | 305 | return d->partialMessageBuffer.size() >= headerSize; |
294 | } | 306 | } |
295 | 307 | ||
308 | void ResourceAccess::callCallbacks(int id) | ||
309 | { | ||
310 | for(auto handler : d->resultHandler.values(id)) { | ||
311 | handler(); | ||
312 | } | ||
313 | d->resultHandler.remove(id); | ||
314 | } | ||
315 | |||
296 | void ResourceAccess::log(const QString &message) | 316 | void ResourceAccess::log(const QString &message) |
297 | { | 317 | { |
298 | qDebug() << d->resourceName + ": " + message; | 318 | qDebug() << d->resourceName + ": " + message; |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 7416b25..d79c993 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -40,8 +40,9 @@ public: | |||
40 | QString resourceName() const; | 40 | QString resourceName() const; |
41 | bool isReady() const; | 41 | bool isReady() const; |
42 | 42 | ||
43 | void sendCommand(int commandId); | 43 | //TODO use jobs |
44 | void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); | 44 | void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>()); |
45 | void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback); | ||
45 | void synchronizeResource(const std::function<void()> &resultHandler); | 46 | void synchronizeResource(const std::function<void()> &resultHandler); |
46 | 47 | ||
47 | public Q_SLOTS: | 48 | public Q_SLOTS: |
@@ -51,6 +52,7 @@ public Q_SLOTS: | |||
51 | Q_SIGNALS: | 52 | Q_SIGNALS: |
52 | void ready(bool isReady); | 53 | void ready(bool isReady); |
53 | void revisionChanged(unsigned long long revision); | 54 | void revisionChanged(unsigned long long revision); |
55 | void commandCompleted(); | ||
54 | 56 | ||
55 | private Q_SLOTS: | 57 | private Q_SLOTS: |
56 | //TODO: move these to the Private class | 58 | //TODO: move these to the Private class |
@@ -59,9 +61,11 @@ private Q_SLOTS: | |||
59 | void connectionError(QLocalSocket::LocalSocketError error); | 61 | void connectionError(QLocalSocket::LocalSocketError error); |
60 | void readResourceMessage(); | 62 | void readResourceMessage(); |
61 | bool processMessageBuffer(); | 63 | bool processMessageBuffer(); |
64 | void callCallbacks(int id); | ||
62 | 65 | ||
63 | private: | 66 | private: |
64 | void log(const QString &message); | 67 | void log(const QString &message); |
68 | void registerCallback(uint messageId, const std::function<void()> &callback); | ||
65 | 69 | ||
66 | class Private; | 70 | class Private; |
67 | Private * const d; | 71 | Private * const d; |
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index 458aba6..d3974e9 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp | |||
@@ -26,8 +26,9 @@ | |||
26 | #include "common/commands.h" | 26 | #include "common/commands.h" |
27 | #include "dummycalendar_generated.h" | 27 | #include "dummycalendar_generated.h" |
28 | #include "event_generated.h" | 28 | #include "event_generated.h" |
29 | #include "entitybuffer_generated.h" | 29 | #include "entity_generated.h" |
30 | #include "metadata_generated.h" | 30 | #include "metadata_generated.h" |
31 | #include <common/entitybuffer.h> | ||
31 | 32 | ||
32 | using namespace DummyCalendar; | 33 | using namespace DummyCalendar; |
33 | using namespace flatbuffers; | 34 | using namespace flatbuffers; |
@@ -199,14 +200,47 @@ void DummyResourceFacade::load(const Akonadi2::Query &query, const std::function | |||
199 | storage->startTransaction(Akonadi2::Storage::ReadOnly); | 200 | storage->startTransaction(Akonadi2::Storage::ReadOnly); |
200 | //Because we have no indexes yet, we always do a full scan | 201 | //Because we have no indexes yet, we always do a full scan |
201 | storage->scan("", [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { | 202 | storage->scan("", [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { |
202 | qDebug() << QString::fromStdString(std::string(static_cast<char*>(keyValue), keySize)); | 203 | |
203 | auto buffer = Akonadi2::GetEntityBuffer(dataValue); | 204 | //Skip internals |
204 | auto resourceBuffer = GetDummyEvent(buffer->resource()); | 205 | if (QByteArray::fromRawData(static_cast<char*>(keyValue), keySize).startsWith("__internal")) { |
205 | auto metadataBuffer = Akonadi2::GetMetadata(buffer->resource()); | 206 | return true; |
206 | auto localBuffer = Akonadi2::Domain::Buffer::GetEvent(buffer->local()); | 207 | } |
208 | |||
209 | //Extract buffers | ||
210 | Akonadi2::EntityBuffer buffer(dataValue, dataSize); | ||
211 | |||
212 | DummyEvent const *resourceBuffer = 0; | ||
213 | if (auto resourceData = buffer.resourceBuffer()) { | ||
214 | flatbuffers::Verifier verifyer(resourceData->Data(), resourceData->size()); | ||
215 | if (VerifyDummyEventBuffer(verifyer)) { | ||
216 | resourceBuffer = GetDummyEvent(resourceData); | ||
217 | } | ||
218 | } | ||
219 | |||
220 | Akonadi2::Metadata const *metadataBuffer = 0; | ||
221 | if (auto metadataData = buffer.metadataBuffer()) { | ||
222 | flatbuffers::Verifier verifyer(metadataData->Data(), metadataData->size()); | ||
223 | if (Akonadi2::VerifyMetadataBuffer(verifyer)) { | ||
224 | metadataBuffer = Akonadi2::GetMetadata(metadataData); | ||
225 | } | ||
226 | } | ||
227 | |||
228 | Akonadi2::Domain::Buffer::Event const *localBuffer = 0; | ||
229 | if (auto localData = buffer.localBuffer()) { | ||
230 | flatbuffers::Verifier verifyer(localData->Data(), localData->size()); | ||
231 | if (Akonadi2::Domain::Buffer::VerifyEventBuffer(verifyer)) { | ||
232 | localBuffer = Akonadi2::Domain::Buffer::GetEvent(localData); | ||
233 | } | ||
234 | } | ||
235 | |||
236 | if (!resourceBuffer || !metadataBuffer) { | ||
237 | qWarning() << "invalid buffer " << QString::fromStdString(std::string(static_cast<char*>(keyValue), keySize)); | ||
238 | return true; | ||
239 | } | ||
240 | |||
207 | //We probably only want to create all buffers after the scan | 241 | //We probably only want to create all buffers after the scan |
208 | if (preparedQuery && preparedQuery(std::string(static_cast<char*>(keyValue), keySize), resourceBuffer)) { | 242 | if (preparedQuery && preparedQuery(std::string(static_cast<char*>(keyValue), keySize), resourceBuffer)) { |
209 | qint64 revision = metadataBuffer->revision(); | 243 | qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; |
210 | auto adaptor = QSharedPointer<DummyEventAdaptor>::create(); | 244 | auto adaptor = QSharedPointer<DummyEventAdaptor>::create(); |
211 | adaptor->mLocalBuffer = localBuffer; | 245 | adaptor->mLocalBuffer = localBuffer; |
212 | adaptor->mResourceBuffer = resourceBuffer; | 246 | adaptor->mResourceBuffer = resourceBuffer; |
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 6b93985..c9e4d7a 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -19,7 +19,9 @@ | |||
19 | 19 | ||
20 | #include "resourcefactory.h" | 20 | #include "resourcefactory.h" |
21 | #include "facade.h" | 21 | #include "facade.h" |
22 | #include "entitybuffer.h" | ||
22 | #include "dummycalendar_generated.h" | 23 | #include "dummycalendar_generated.h" |
24 | #include "metadata_generated.h" | ||
23 | #include <QUuid> | 25 | #include <QUuid> |
24 | 26 | ||
25 | static std::string createEvent() | 27 | static std::string createEvent() |
@@ -64,51 +66,67 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri | |||
64 | //TODO lookup in rid index instead of doing a full scan | 66 | //TODO lookup in rid index instead of doing a full scan |
65 | const std::string ridString = rid.toStdString(); | 67 | const std::string ridString = rid.toStdString(); |
66 | storage->scan("", [&](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { | 68 | storage->scan("", [&](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { |
67 | auto eventBuffer = DummyCalendar::GetDummyEvent(dataValue); | 69 | if (QByteArray::fromRawData(static_cast<char*>(keyValue), keySize).startsWith("__internal")) { |
68 | if (std::string(eventBuffer->remoteId()->c_str(), eventBuffer->remoteId()->size()) == ridString) { | 70 | return true; |
69 | callback(keyValue, keySize, dataValue, dataSize); | ||
70 | } | 71 | } |
72 | |||
73 | Akonadi2::EntityBuffer::extractResourceBuffer(dataValue, dataSize, [&](const flatbuffers::Vector<uint8_t> *buffer) { | ||
74 | flatbuffers::Verifier verifier(buffer->Data(), buffer->size()); | ||
75 | if (DummyCalendar::VerifyDummyEventBuffer(verifier)) { | ||
76 | DummyCalendar::DummyEvent const *resourceBuffer = DummyCalendar::GetDummyEvent(buffer->Data()); | ||
77 | if (resourceBuffer && resourceBuffer->remoteId()) { | ||
78 | if (std::string(resourceBuffer->remoteId()->c_str(), resourceBuffer->remoteId()->size()) == ridString) { | ||
79 | callback(keyValue, keySize, dataValue, dataSize); | ||
80 | } | ||
81 | } | ||
82 | } | ||
83 | }); | ||
71 | return true; | 84 | return true; |
72 | }); | 85 | }); |
73 | } | 86 | } |
74 | 87 | ||
75 | void DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) | 88 | Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) |
76 | { | 89 | { |
77 | //TODO use a read-only transaction during the complete sync to sync against a defined revision | 90 | return Async::start<void>([this, pipeline](Async::Future<void> &f) { |
78 | 91 | //TODO use a read-only transaction during the complete sync to sync against a defined revision | |
79 | qDebug() << "synchronize with source"; | 92 | auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); |
80 | 93 | for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) { | |
81 | auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); | 94 | bool isNew = true; |
82 | for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) { | 95 | if (storage->exists()) { |
83 | bool isNew = true; | 96 | findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) { |
84 | if (storage->exists()) { | 97 | isNew = false; |
85 | findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) { | 98 | }); |
86 | isNew = false; | 99 | } |
87 | }); | 100 | if (isNew) { |
88 | } | 101 | m_fbb.Clear(); |
89 | 102 | ||
90 | if (isNew) { | 103 | const QByteArray data = it.value().toUtf8(); |
91 | //TODO: perhaps it would be more convenient to populate the domain types? | 104 | auto eventBuffer = DummyCalendar::GetDummyEvent(data.data()); |
92 | //Resource specific parts are not accessible that way, but then we would only have to implement the property mapping in one place | 105 | |
93 | const QByteArray data = it.value().toUtf8(); | 106 | //Map the source format to the buffer format (which happens to be an exact copy here) |
94 | auto eventBuffer = DummyCalendar::GetDummyEvent(data.data()); | 107 | auto summary = m_fbb.CreateString(eventBuffer->summary()->c_str()); |
95 | 108 | auto rid = m_fbb.CreateString(it.key().toStdString().c_str()); | |
96 | //Map the source format to the buffer format (which happens to be an exact copy here) | 109 | auto description = m_fbb.CreateString(it.key().toStdString().c_str()); |
97 | auto builder = DummyCalendar::DummyEventBuilder(m_fbb); | 110 | static uint8_t rawData[100]; |
98 | builder.add_summary(m_fbb.CreateString(eventBuffer->summary()->c_str())); | 111 | auto attachment = m_fbb.CreateVector(rawData, 100); |
99 | auto buffer = builder.Finish(); | 112 | |
100 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); | 113 | auto builder = DummyCalendar::DummyEventBuilder(m_fbb); |
101 | 114 | builder.add_summary(summary); | |
102 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | 115 | builder.add_remoteId(rid); |
103 | const auto key = QUuid::createUuid().toString().toUtf8(); | 116 | builder.add_description(description); |
104 | //TODO can we really just start populating the buffer and pass the buffer builder? | 117 | builder.add_attachment(attachment); |
105 | qDebug() << "new event"; | 118 | auto buffer = builder.Finish(); |
106 | pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); | 119 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); |
107 | } else { //modification | 120 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. |
108 | //TODO diff and create modification if necessary | 121 | const auto key = QUuid::createUuid().toString().toUtf8(); |
122 | pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
123 | } else { //modification | ||
124 | //TODO diff and create modification if necessary | ||
125 | } | ||
109 | } | 126 | } |
110 | } | 127 | //TODO find items to remove |
111 | //TODO find items to remove | 128 | f.setFinished(); |
129 | }); | ||
112 | } | 130 | } |
113 | 131 | ||
114 | void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) | 132 | void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) |
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 807a654..dba674f 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h | |||
@@ -20,6 +20,7 @@ | |||
20 | #pragma once | 20 | #pragma once |
21 | 21 | ||
22 | #include "common/resource.h" | 22 | #include "common/resource.h" |
23 | #include "async/src/async.h" | ||
23 | 24 | ||
24 | #include <flatbuffers/flatbuffers.h> | 25 | #include <flatbuffers/flatbuffers.h> |
25 | 26 | ||
@@ -30,7 +31,7 @@ class DummyResource : public Akonadi2::Resource | |||
30 | { | 31 | { |
31 | public: | 32 | public: |
32 | DummyResource(); | 33 | DummyResource(); |
33 | void synchronizeWithSource(Akonadi2::Pipeline *pipeline); | 34 | Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline); |
34 | void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); | 35 | void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); |
35 | 36 | ||
36 | private: | 37 | private: |
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 328d4d6..8e94213 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -35,7 +35,6 @@ | |||
35 | Listener::Listener(const QString &resourceName, QObject *parent) | 35 | Listener::Listener(const QString &resourceName, QObject *parent) |
36 | : QObject(parent), | 36 | : QObject(parent), |
37 | m_server(new QLocalServer(this)), | 37 | m_server(new QLocalServer(this)), |
38 | m_revision(0), | ||
39 | m_resourceName(resourceName), | 38 | m_resourceName(resourceName), |
40 | m_resource(0), | 39 | m_resource(0), |
41 | m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), | 40 | m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), |
@@ -46,18 +45,18 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
46 | this, &Listener::refreshRevision); | 45 | this, &Listener::refreshRevision); |
47 | connect(m_server, &QLocalServer::newConnection, | 46 | connect(m_server, &QLocalServer::newConnection, |
48 | this, &Listener::acceptConnection); | 47 | this, &Listener::acceptConnection); |
49 | Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); | 48 | log(QString("Trying to open %1").arg(resourceName)); |
50 | if (!m_server->listen(resourceName)) { | 49 | if (!m_server->listen(resourceName)) { |
51 | // FIXME: multiple starts need to be handled here | 50 | // FIXME: multiple starts need to be handled here |
52 | m_server->removeServer(resourceName); | 51 | m_server->removeServer(resourceName); |
53 | if (!m_server->listen(resourceName)) { | 52 | if (!m_server->listen(resourceName)) { |
54 | Akonadi2::Console::main()->log("Utter failure to start server"); | 53 | log("Utter failure to start server"); |
55 | exit(-1); | 54 | exit(-1); |
56 | } | 55 | } |
57 | } | 56 | } |
58 | 57 | ||
59 | if (m_server->isListening()) { | 58 | if (m_server->isListening()) { |
60 | Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); | 59 | log(QString("Listening on %1").arg(m_server->serverName())); |
61 | } | 60 | } |
62 | 61 | ||
63 | //TODO: experiment with different timeouts | 62 | //TODO: experiment with different timeouts |
@@ -73,19 +72,6 @@ Listener::~Listener() | |||
73 | { | 72 | { |
74 | } | 73 | } |
75 | 74 | ||
76 | void Listener::setRevision(unsigned long long revision) | ||
77 | { | ||
78 | if (m_revision != revision) { | ||
79 | m_revision = revision; | ||
80 | updateClientsWithRevision(); | ||
81 | } | ||
82 | } | ||
83 | |||
84 | unsigned long long Listener::revision() const | ||
85 | { | ||
86 | return m_revision; | ||
87 | } | ||
88 | |||
89 | void Listener::closeAllConnections() | 75 | void Listener::closeAllConnections() |
90 | { | 76 | { |
91 | for (Client &client: m_connections) { | 77 | for (Client &client: m_connections) { |
@@ -101,14 +87,14 @@ void Listener::closeAllConnections() | |||
101 | 87 | ||
102 | void Listener::acceptConnection() | 88 | void Listener::acceptConnection() |
103 | { | 89 | { |
104 | Akonadi2::Console::main()->log(QString("Accepting connection")); | 90 | log(QString("Accepting connection")); |
105 | QLocalSocket *socket = m_server->nextPendingConnection(); | 91 | QLocalSocket *socket = m_server->nextPendingConnection(); |
106 | 92 | ||
107 | if (!socket) { | 93 | if (!socket) { |
108 | return; | 94 | return; |
109 | } | 95 | } |
110 | 96 | ||
111 | Akonadi2::Console::main()->log("Got a connection"); | 97 | log("Got a connection"); |
112 | Client client("Unknown Client", socket); | 98 | Client client("Unknown Client", socket); |
113 | connect(socket, &QIODevice::readyRead, | 99 | connect(socket, &QIODevice::readyRead, |
114 | this, &Listener::readFromSocket); | 100 | this, &Listener::readFromSocket); |
@@ -125,12 +111,12 @@ void Listener::clientDropped() | |||
125 | return; | 111 | return; |
126 | } | 112 | } |
127 | 113 | ||
128 | Akonadi2::Console::main()->log("Dropping connection..."); | 114 | log("Dropping connection..."); |
129 | QMutableVectorIterator<Client> it(m_connections); | 115 | QMutableVectorIterator<Client> it(m_connections); |
130 | while (it.hasNext()) { | 116 | while (it.hasNext()) { |
131 | const Client &client = it.next(); | 117 | const Client &client = it.next(); |
132 | if (client.socket == socket) { | 118 | if (client.socket == socket) { |
133 | Akonadi2::Console::main()->log(QString(" dropped... %1").arg(client.name)); | 119 | log(QString(" dropped... %1").arg(client.name)); |
134 | it.remove(); | 120 | it.remove(); |
135 | break; | 121 | break; |
136 | } | 122 | } |
@@ -154,7 +140,7 @@ void Listener::readFromSocket() | |||
154 | return; | 140 | return; |
155 | } | 141 | } |
156 | 142 | ||
157 | Akonadi2::Console::main()->log("Reading from socket..."); | 143 | log("Reading from socket..."); |
158 | for (Client &client: m_connections) { | 144 | for (Client &client: m_connections) { |
159 | if (client.socket == socket) { | 145 | if (client.socket == socket) { |
160 | client.commandBuffer += socket->readAll(); | 146 | client.commandBuffer += socket->readAll(); |
@@ -188,6 +174,57 @@ void Listener::processClientBuffers() | |||
188 | } | 174 | } |
189 | } | 175 | } |
190 | 176 | ||
177 | void Listener::processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback) | ||
178 | { | ||
179 | switch (commandId) { | ||
180 | case Akonadi2::Commands::HandshakeCommand: { | ||
181 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); | ||
182 | if (Akonadi2::VerifyHandshakeBuffer(verifier)) { | ||
183 | auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); | ||
184 | client.name = buffer->name()->c_str(); | ||
185 | sendCurrentRevision(client); | ||
186 | } else { | ||
187 | qWarning() << "received invalid command"; | ||
188 | } | ||
189 | break; | ||
190 | } | ||
191 | case Akonadi2::Commands::SynchronizeCommand: { | ||
192 | log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); | ||
193 | loadResource(); | ||
194 | if (m_resource) { | ||
195 | qDebug() << "synchronizing"; | ||
196 | m_resource->synchronizeWithSource(m_pipeline).then<void>([callback](Async::Future<void> &f){ | ||
197 | //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result | ||
198 | callback(); | ||
199 | f.setFinished(); | ||
200 | }).exec(); | ||
201 | return; | ||
202 | } else { | ||
203 | qWarning() << "No resource loaded"; | ||
204 | } | ||
205 | break; | ||
206 | } | ||
207 | case Akonadi2::Commands::FetchEntityCommand: | ||
208 | case Akonadi2::Commands::DeleteEntityCommand: | ||
209 | case Akonadi2::Commands::ModifyEntityCommand: | ||
210 | case Akonadi2::Commands::CreateEntityCommand: | ||
211 | log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); | ||
212 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | ||
213 | break; | ||
214 | default: | ||
215 | if (commandId > Akonadi2::Commands::CustomCommand) { | ||
216 | loadResource(); | ||
217 | if (m_resource) { | ||
218 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | ||
219 | } | ||
220 | } else { | ||
221 | //TODO: handle error: we don't know wtf this command is | ||
222 | } | ||
223 | break; | ||
224 | } | ||
225 | callback(); | ||
226 | } | ||
227 | |||
191 | bool Listener::processClientBuffer(Client &client) | 228 | bool Listener::processClientBuffer(Client &client) |
192 | { | 229 | { |
193 | static const int headerSize = Akonadi2::Commands::headerSize(); | 230 | static const int headerSize = Akonadi2::Commands::headerSize(); |
@@ -195,58 +232,22 @@ bool Listener::processClientBuffer(Client &client) | |||
195 | return false; | 232 | return false; |
196 | } | 233 | } |
197 | 234 | ||
198 | int commandId; | 235 | const uint messageId = *(uint*)client.commandBuffer.constData(); |
199 | uint messageId, size; | 236 | const int commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); |
200 | messageId = *(uint*)client.commandBuffer.constData(); | 237 | const uint size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); |
201 | commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); | ||
202 | size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); | ||
203 | 238 | ||
204 | //TODO: reject messages above a certain size? | 239 | //TODO: reject messages above a certain size? |
205 | 240 | ||
206 | if (size <= uint(client.commandBuffer.size() - headerSize)) { | 241 | if (size <= uint(client.commandBuffer.size() - headerSize)) { |
207 | client.commandBuffer.remove(0, headerSize); | 242 | client.commandBuffer.remove(0, headerSize); |
208 | 243 | ||
209 | switch (commandId) { | 244 | processCommand(commandId, messageId, client, size, [this, messageId, commandId, &client]() { |
210 | case Akonadi2::Commands::HandshakeCommand: { | 245 | log(QString("\tCompleted command messageid %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); |
211 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); | 246 | //FIXME, client needs to become a shared pointer and not a reference, or we have to search through m_connections everytime. |
212 | if (Akonadi2::VerifyHandshakeBuffer(verifier)) { | 247 | sendCommandCompleted(client, messageId); |
213 | auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); | 248 | }); |
214 | client.name = buffer->name()->c_str(); | ||
215 | sendCurrentRevision(client); | ||
216 | } | ||
217 | break; | ||
218 | } | ||
219 | case Akonadi2::Commands::SynchronizeCommand: { | ||
220 | Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); | ||
221 | loadResource(); | ||
222 | if (m_resource) { | ||
223 | m_resource->synchronizeWithSource(m_pipeline); | ||
224 | } | ||
225 | break; | ||
226 | } | ||
227 | case Akonadi2::Commands::FetchEntityCommand: | ||
228 | case Akonadi2::Commands::DeleteEntityCommand: | ||
229 | case Akonadi2::Commands::ModifyEntityCommand: | ||
230 | case Akonadi2::Commands::CreateEntityCommand: | ||
231 | Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); | ||
232 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | ||
233 | break; | ||
234 | default: | ||
235 | if (commandId > Akonadi2::Commands::CustomCommand) { | ||
236 | loadResource(); | ||
237 | if (m_resource) { | ||
238 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | ||
239 | } | ||
240 | } else { | ||
241 | //TODO: handle error: we don't know wtf this command is | ||
242 | } | ||
243 | break; | ||
244 | } | ||
245 | |||
246 | //TODO: async commands == async sendCommandCompleted | ||
247 | Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); | ||
248 | client.commandBuffer.remove(0, size); | 249 | client.commandBuffer.remove(0, size); |
249 | sendCommandCompleted(client, messageId); | 250 | |
250 | return client.commandBuffer.size() >= headerSize; | 251 | return client.commandBuffer.size() >= headerSize; |
251 | } | 252 | } |
252 | 253 | ||
@@ -259,7 +260,7 @@ void Listener::sendCurrentRevision(Client &client) | |||
259 | return; | 260 | return; |
260 | } | 261 | } |
261 | 262 | ||
262 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); | 263 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); |
263 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); | 264 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); |
264 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); | 265 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); |
265 | m_fbb.Clear(); | 266 | m_fbb.Clear(); |
@@ -279,14 +280,12 @@ void Listener::sendCommandCompleted(Client &client, uint messageId) | |||
279 | 280 | ||
280 | void Listener::refreshRevision() | 281 | void Listener::refreshRevision() |
281 | { | 282 | { |
282 | //TODO this should be coming out of m_pipeline->storage() | ||
283 | ++m_revision; | ||
284 | updateClientsWithRevision(); | 283 | updateClientsWithRevision(); |
285 | } | 284 | } |
286 | 285 | ||
287 | void Listener::updateClientsWithRevision() | 286 | void Listener::updateClientsWithRevision() |
288 | { | 287 | { |
289 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); | 288 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); |
290 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); | 289 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); |
291 | 290 | ||
292 | for (const Client &client: m_connections) { | 291 | for (const Client &client: m_connections) { |
@@ -308,13 +307,19 @@ void Listener::loadResource() | |||
308 | Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); | 307 | Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); |
309 | if (resourceFactory) { | 308 | if (resourceFactory) { |
310 | m_resource = resourceFactory->createResource(); | 309 | m_resource = resourceFactory->createResource(); |
311 | Akonadi2::Console::main()->log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); | 310 | log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); |
312 | Akonadi2::Console::main()->log(QString("\tResource: %1").arg((qlonglong)m_resource)); | 311 | log(QString("\tResource: %1").arg((qlonglong)m_resource)); |
313 | //TODO: this doesn't really list all the facades .. fix | 312 | //TODO: this doesn't really list all the facades .. fix |
314 | Akonadi2::Console::main()->log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type())); | 313 | log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type())); |
315 | } else { | 314 | } else { |
316 | Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); | 315 | log(QString("Failed to load resource %1").arg(m_resourceName)); |
317 | } | 316 | } |
318 | //TODO: on failure ... what? | 317 | //TODO: on failure ... what? |
318 | //Enter broken state? | ||
319 | } | ||
320 | |||
321 | void Listener::log(const QString &message) | ||
322 | { | ||
323 | Akonadi2::Console::main()->log("Listener: " + message); | ||
319 | } | 324 | } |
320 | 325 | ||
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index 357ae37..4c35191 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -61,9 +61,6 @@ public: | |||
61 | Listener(const QString &resourceName, QObject *parent = 0); | 61 | Listener(const QString &resourceName, QObject *parent = 0); |
62 | ~Listener(); | 62 | ~Listener(); |
63 | 63 | ||
64 | void setRevision(unsigned long long revision); | ||
65 | unsigned long long revision() const; | ||
66 | |||
67 | Q_SIGNALS: | 64 | Q_SIGNALS: |
68 | void noClients(); | 65 | void noClients(); |
69 | 66 | ||
@@ -79,15 +76,16 @@ private Q_SLOTS: | |||
79 | void refreshRevision(); | 76 | void refreshRevision(); |
80 | 77 | ||
81 | private: | 78 | private: |
79 | void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback); | ||
82 | bool processClientBuffer(Client &client); | 80 | bool processClientBuffer(Client &client); |
83 | void sendCurrentRevision(Client &client); | 81 | void sendCurrentRevision(Client &client); |
84 | void sendCommandCompleted(Client &client, uint messageId); | 82 | void sendCommandCompleted(Client &client, uint messageId); |
85 | void updateClientsWithRevision(); | 83 | void updateClientsWithRevision(); |
86 | void loadResource(); | 84 | void loadResource(); |
85 | void log(const QString &); | ||
87 | 86 | ||
88 | QLocalServer *m_server; | 87 | QLocalServer *m_server; |
89 | QVector<Client> m_connections; | 88 | QVector<Client> m_connections; |
90 | unsigned long long m_revision; | ||
91 | flatbuffers::FlatBufferBuilder m_fbb; | 89 | flatbuffers::FlatBufferBuilder m_fbb; |
92 | const QString m_resourceName; | 90 | const QString m_resourceName; |
93 | Akonadi2::Resource *m_resource; | 91 | Akonadi2::Resource *m_resource; |
diff --git a/tests/dummyresourcefacadetest.cpp b/tests/dummyresourcefacadetest.cpp index d815e9b..e4d27fc 100644 --- a/tests/dummyresourcefacadetest.cpp +++ b/tests/dummyresourcefacadetest.cpp | |||
@@ -51,17 +51,10 @@ private Q_SLOTS: | |||
51 | query.ids << "key50"; | 51 | query.ids << "key50"; |
52 | query.resources << "dummyresource"; | 52 | query.resources << "dummyresource"; |
53 | 53 | ||
54 | auto result = Akonadi2::Store::load<Akonadi2::Domain::Event>(query); | 54 | //FIXME avoid sync somehow. No synchronizer access here (perhaps configure the instance above accordingly?) |
55 | bool complete = false; | 55 | async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); |
56 | QVector<Akonadi2::Domain::Event::Ptr> results; | 56 | result.exec(); |
57 | result->onAdded([&results](const Akonadi2::Domain::Event::Ptr &e) { | 57 | QCOMPARE(result.size(), 1); |
58 | results << e; | ||
59 | }); | ||
60 | result->onComplete([&complete]() { | ||
61 | complete = true; | ||
62 | }); | ||
63 | QTRY_VERIFY(complete); | ||
64 | QCOMPARE(results.size(), 1); | ||
65 | 58 | ||
66 | Akonadi2::Storage storage(testDataPath, dbName); | 59 | Akonadi2::Storage storage(testDataPath, dbName); |
67 | storage.removeFromDisk(); | 60 | storage.removeFromDisk(); |
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 75d29de..0c02675 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -2,7 +2,7 @@ | |||
2 | 2 | ||
3 | #include <QString> | 3 | #include <QString> |
4 | 4 | ||
5 | #include "common/resource.h" | 5 | #include "dummyresource/resourcefactory.h" |
6 | #include "clientapi.h" | 6 | #include "clientapi.h" |
7 | 7 | ||
8 | class DummyResourceTest : public QObject | 8 | class DummyResourceTest : public QObject |
@@ -13,12 +13,23 @@ private Q_SLOTS: | |||
13 | { | 13 | { |
14 | auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy"); | 14 | auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy"); |
15 | QVERIFY(factory); | 15 | QVERIFY(factory); |
16 | Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy", Akonadi2::Storage::ReadWrite); | ||
17 | store.removeFromDisk(); | ||
16 | } | 18 | } |
17 | 19 | ||
18 | void cleanupTestCase() | 20 | void cleanupTestCase() |
19 | { | 21 | { |
20 | } | 22 | } |
21 | 23 | ||
24 | void testResource() | ||
25 | { | ||
26 | Akonadi2::Pipeline pipeline("org.kde.dummy"); | ||
27 | DummyResource resource; | ||
28 | auto job = resource.synchronizeWithSource(&pipeline); | ||
29 | auto future = job.exec(); | ||
30 | QTRY_VERIFY(future.isFinished()); | ||
31 | } | ||
32 | |||
22 | void testSync() | 33 | void testSync() |
23 | { | 34 | { |
24 | Akonadi2::Query query; | 35 | Akonadi2::Query query; |
@@ -27,6 +38,8 @@ private Q_SLOTS: | |||
27 | async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); | 38 | async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); |
28 | result.exec(); | 39 | result.exec(); |
29 | QVERIFY(!result.isEmpty()); | 40 | QVERIFY(!result.isEmpty()); |
41 | auto value = result.first(); | ||
42 | qDebug() << value->getProperty("summary"); | ||
30 | } | 43 | } |
31 | 44 | ||
32 | }; | 45 | }; |