summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt7
-rw-r--r--common/entity.fbs (renamed from common/entitybuffer.fbs)4
-rw-r--r--common/entitybuffer.cpp53
-rw-r--r--common/entitybuffer.h22
-rw-r--r--common/pipeline.cpp29
-rw-r--r--common/pipeline.h1
-rw-r--r--common/resource.cpp6
-rw-r--r--common/resource.h3
-rw-r--r--common/resourceaccess.cpp96
-rw-r--r--common/resourceaccess.h8
10 files changed, 167 insertions, 62 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index ec13e07..1a9a812 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -8,7 +8,7 @@ generate_flatbuffers(
8 commands/modifyentity 8 commands/modifyentity
9 commands/revisionupdate 9 commands/revisionupdate
10 domain/event 10 domain/event
11 entitybuffer 11 entity
12 metadata 12 metadata
13) 13)
14 14
@@ -17,10 +17,11 @@ if (STORAGE_unqlite)
17 set(storage_SRCS unqlite/unqlite.c storage_unqlite.cpp) 17 set(storage_SRCS unqlite/unqlite.c storage_unqlite.cpp)
18else (STORAGE_unqlite) 18else (STORAGE_unqlite)
19 set(storage_SRCS storage_lmdb.cpp) 19 set(storage_SRCS storage_lmdb.cpp)
20 set(storage_LIBS lmdb) 20 set(storage_LIBS ${lmdb})
21endif (STORAGE_unqlite) 21endif (STORAGE_unqlite)
22 22
23set(command_SRCS 23set(command_SRCS
24 entitybuffer.cpp
24 clientapi.cpp 25 clientapi.cpp
25 commands.cpp 26 commands.cpp
26 console.cpp 27 console.cpp
@@ -35,7 +36,7 @@ add_library(${PROJECT_NAME} SHARED ${command_SRCS})
35generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) 36generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h)
36SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) 37SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX)
37qt5_use_modules(${PROJECT_NAME} Widgets Network) 38qt5_use_modules(${PROJECT_NAME} Widgets Network)
38target_link_libraries(${PROJECT_NAME} ${storage_LIBS}) 39target_link_libraries(${PROJECT_NAME} ${storage_LIBS} akonadi2async)
39install(TARGETS ${PROJECT_NAME} DESTINATION lib) 40install(TARGETS ${PROJECT_NAME} DESTINATION lib)
40 41
41add_subdirectory(test) 42add_subdirectory(test)
diff --git a/common/entitybuffer.fbs b/common/entity.fbs
index 28c9b2a..565b1a7 100644
--- a/common/entitybuffer.fbs
+++ b/common/entity.fbs
@@ -1,9 +1,9 @@
1namespace Akonadi2; 1namespace Akonadi2;
2 2
3table EntityBuffer { 3table Entity {
4 metadata: [ubyte]; 4 metadata: [ubyte];
5 resource: [ubyte]; 5 resource: [ubyte];
6 local: [ubyte]; 6 local: [ubyte];
7} 7}
8 8
9root_type EntityBuffer; 9root_type Entity;
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp
new file mode 100644
index 0000000..a78e91d
--- /dev/null
+++ b/common/entitybuffer.cpp
@@ -0,0 +1,53 @@
1#include "entitybuffer.h"
2
3#include "entity_generated.h"
4#include "metadata_generated.h"
5#include <QDebug>
6
7using namespace Akonadi2;
8
9EntityBuffer::EntityBuffer(void *dataValue, int dataSize)
10 : mEntity(nullptr)
11{
12 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(dataValue), dataSize);
13 // Q_ASSERT(Akonadi2::VerifyEntity(verifyer));
14 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
15 qWarning() << "invalid buffer";
16 } else {
17 mEntity = Akonadi2::GetEntity(dataValue);
18 }
19}
20
21const flatbuffers::Vector<uint8_t>* EntityBuffer::resourceBuffer()
22{
23 if (!mEntity) {
24 qDebug() << "no buffer";
25 return nullptr;
26 }
27 return mEntity->resource();
28}
29
30const flatbuffers::Vector<uint8_t>* EntityBuffer::metadataBuffer()
31{
32 if (!mEntity) {
33 return nullptr;
34 }
35 return mEntity->metadata();
36}
37
38const flatbuffers::Vector<uint8_t>* EntityBuffer::localBuffer()
39{
40 if (!mEntity) {
41 return nullptr;
42 }
43 return mEntity->local();
44}
45
46void EntityBuffer::extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const flatbuffers::Vector<uint8_t> *)> &handler)
47{
48 Akonadi2::EntityBuffer buffer(dataValue, dataSize);
49 if (auto resourceData = buffer.resourceBuffer()) {
50 handler(resourceData);
51 }
52}
53
diff --git a/common/entitybuffer.h b/common/entitybuffer.h
new file mode 100644
index 0000000..2a7150e
--- /dev/null
+++ b/common/entitybuffer.h
@@ -0,0 +1,22 @@
1#pragma once
2
3#include <functional>
4#include <flatbuffers/flatbuffers.h>
5
6namespace Akonadi2 {
7class Entity;
8
9class EntityBuffer {
10public:
11 EntityBuffer(void *dataValue, int size);
12 const flatbuffers::Vector<uint8_t> *resourceBuffer();
13 const flatbuffers::Vector<uint8_t> *metadataBuffer();
14 const flatbuffers::Vector<uint8_t> *localBuffer();
15
16 static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const flatbuffers::Vector<uint8_t> *)> &handler);
17private:
18 const Entity *mEntity;
19};
20
21}
22
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 5ca8b95..dc6d389 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -24,7 +24,7 @@
24#include <QStandardPaths> 24#include <QStandardPaths>
25#include <QVector> 25#include <QVector>
26#include <QDebug> 26#include <QDebug>
27#include "entitybuffer_generated.h" 27#include "entity_generated.h"
28#include "metadata_generated.h" 28#include "metadata_generated.h"
29 29
30namespace Akonadi2 30namespace Akonadi2
@@ -76,33 +76,31 @@ void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t
76{ 76{
77 const qint64 newRevision = storage().maxRevision() + 1; 77 const qint64 newRevision = storage().maxRevision() + 1;
78 78
79 flatbuffers::FlatBufferBuilder fbb;
80 auto builder = Akonadi2::EntityBufferBuilder(fbb);
81 79
80 std::vector<uint8_t> metadataData;
82 //Add metadata buffer 81 //Add metadata buffer
83 { 82 {
84 flatbuffers::FlatBufferBuilder metadataFbb; 83 flatbuffers::FlatBufferBuilder metadataFbb;
85 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 84 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
86 metadataBuilder.add_revision(newRevision); 85 metadataBuilder.add_revision(newRevision);
87 auto metadataBuffer = metadataBuilder.Finish(); 86 auto metadataBuffer = metadataBuilder.Finish();
88 Akonadi2::FinishMetadataBuffer(fbb, metadataBuffer); 87 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
89 //TODO use memcpy 88 metadataData.resize(metadataFbb.GetSize());
90 auto metadata = fbb.CreateVector<uint8_t>(metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 89 std::copy_n(metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), back_inserter(metadataData));
91 builder.add_metadata(metadata);
92 } 90 }
93 91
94 //Add resource buffer
95 {
96 //TODO use memcpy
97 auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(resourceBufferData), size);
98 builder.add_resource(resource);
99 }
100 92
93 flatbuffers::FlatBufferBuilder fbb;
94 auto metadata = fbb.CreateVector<uint8_t>(metadataData.data(), metadataData.size());
95 auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(resourceBufferData), size);
96 auto builder = Akonadi2::EntityBuilder(fbb);
97 builder.add_metadata(metadata);
98 builder.add_resource(resource);
101 //We don't have a local buffer yet 99 //We don't have a local buffer yet
102 // builder.add_local(); 100 // builder.add_local();
103 101
104 auto buffer = builder.Finish(); 102 auto buffer = builder.Finish();
105 Akonadi2::FinishEntityBufferBuffer(fbb, buffer); 103 Akonadi2::FinishEntityBuffer(fbb, buffer);
106 104
107 qDebug() << "writing new entity" << key; 105 qDebug() << "writing new entity" << key;
108 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); 106 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
@@ -163,6 +161,9 @@ void Pipeline::pipelineCompleted(const PipelineState &state)
163 emit revisionUpdated(); 161 emit revisionUpdated();
164 } 162 }
165 scheduleStep(); 163 scheduleStep();
164 if (d->activePipelines.isEmpty()) {
165 emit pipelinesDrained();
166 }
166} 167}
167 168
168 169
diff --git a/common/pipeline.h b/common/pipeline.h
index 159cc1c..6ef8703 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -55,6 +55,7 @@ public:
55 55
56Q_SIGNALS: 56Q_SIGNALS:
57 void revisionUpdated(); 57 void revisionUpdated();
58 void pipelinesDrained();
58 59
59private Q_SLOTS: 60private Q_SLOTS:
60 void stepPipelines(); 61 void stepPipelines();
diff --git a/common/resource.cpp b/common/resource.cpp
index ae28485..bba6609 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -48,9 +48,11 @@ void Resource::processCommand(int commandId, const QByteArray &data, uint size,
48 pipeline->null(); 48 pipeline->null();
49} 49}
50 50
51void Resource::synchronizeWithSource(Pipeline *pipeline) 51Async::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline)
52{ 52{
53 pipeline->null(); 53 return Async::start<void>([pipeline](Async::Future<void> &f) {
54 pipeline->null();
55 });
54} 56}
55 57
56class ResourceFactory::Private 58class ResourceFactory::Private
diff --git a/common/resource.h b/common/resource.h
index 0f65e1f..fb42c1b 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -21,6 +21,7 @@
21#include <akonadi2common_export.h> 21#include <akonadi2common_export.h>
22#include <clientapi.h> 22#include <clientapi.h>
23#include <pipeline.h> 23#include <pipeline.h>
24#include <async/src/async.h>
24 25
25namespace Akonadi2 26namespace Akonadi2
26{ 27{
@@ -33,7 +34,7 @@ public:
33 virtual ~Resource(); 34 virtual ~Resource();
34 35
35 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); 36 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline);
36 virtual void synchronizeWithSource(Pipeline *pipeline); 37 virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline);
37 38
38private: 39private:
39 class Private; 40 class Private;
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 1706ac4..31b9e79 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -37,38 +37,36 @@ namespace Akonadi2
37class QueuedCommand 37class QueuedCommand
38{ 38{
39public: 39public:
40 QueuedCommand(int commandId) 40 QueuedCommand(int commandId, const std::function<void()> &callback)
41 : m_commandId(commandId), 41 : commandId(commandId),
42 m_bufferSize(0), 42 bufferSize(0),
43 m_buffer(0) 43 buffer(0),
44 callback(callback)
44 {} 45 {}
45 46
46 QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) 47 QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback)
47 : m_commandId(commandId), 48 : commandId(commandId),
48 m_bufferSize(fbb.GetSize()), 49 bufferSize(fbb.GetSize()),
49 m_buffer(new char[m_bufferSize]) 50 buffer(new char[bufferSize]),
51 callback(callback)
50 { 52 {
51 memcpy(m_buffer, fbb.GetBufferPointer(), m_bufferSize); 53 memcpy(buffer, fbb.GetBufferPointer(), bufferSize);
52 } 54 }
53 55
54 ~QueuedCommand() 56 ~QueuedCommand()
55 { 57 {
56 delete[] m_buffer; 58 delete[] buffer;
57 }
58
59 void write(QIODevice *device, uint messageId)
60 {
61 // Console::main()->log(QString("\tSending queued command %1").arg(m_commandId));
62 Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize);
63 } 59 }
64 60
65private: 61private:
66 QueuedCommand(const QueuedCommand &other); 62 QueuedCommand(const QueuedCommand &other);
67 QueuedCommand &operator=(const QueuedCommand &rhs); 63 QueuedCommand &operator=(const QueuedCommand &rhs);
68 64
69 const int m_commandId; 65public:
70 const uint m_bufferSize; 66 const int commandId;
71 char *m_buffer; 67 const uint bufferSize;
68 char *buffer;
69 std::function<void()> callback;
72}; 70};
73 71
74class ResourceAccess::Private 72class ResourceAccess::Private
@@ -82,7 +80,7 @@ public:
82 QByteArray partialMessageBuffer; 80 QByteArray partialMessageBuffer;
83 flatbuffers::FlatBufferBuilder fbb; 81 flatbuffers::FlatBufferBuilder fbb;
84 QVector<QueuedCommand *> commandQueue; 82 QVector<QueuedCommand *> commandQueue;
85 QVector<std::function<void()> > synchronizeResultHandler; 83 QMultiMap<uint, std::function<void()> > resultHandler;
86 uint messageId; 84 uint messageId;
87}; 85};
88 86
@@ -130,31 +128,42 @@ bool ResourceAccess::isReady() const
130 return d->socket->isValid(); 128 return d->socket->isValid();
131} 129}
132 130
133void ResourceAccess::sendCommand(int commandId) 131void ResourceAccess::registerCallback(uint messageId, const std::function<void()> &callback)
132{
133 d->resultHandler.insert(messageId, callback);
134}
135
136void ResourceAccess::sendCommand(int commandId, const std::function<void()> &callback)
134{ 137{
135 if (isReady()) { 138 if (isReady()) {
136 log(QString("Sending command %1").arg(commandId)); 139 log(QString("Sending command %1").arg(commandId));
137 Commands::write(d->socket, ++d->messageId, commandId); 140 d->messageId++;
141 if (callback) {
142 registerCallback(d->messageId, callback);
143 }
144 Commands::write(d->socket, d->messageId, commandId);
138 } else { 145 } else {
139 d->commandQueue << new QueuedCommand(commandId); 146 d->commandQueue << new QueuedCommand(commandId, callback);
140 } 147 }
141} 148}
142 149
143void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) 150void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback)
144{ 151{
145 if (isReady()) { 152 if (isReady()) {
146 log(QString("Sending command %1").arg(commandId)); 153 log(QString("Sending command %1").arg(commandId));
147 Commands::write(d->socket, ++d->messageId, commandId, fbb); 154 d->messageId++;
155 if (callback) {
156 registerCallback(d->messageId, callback);
157 }
158 Commands::write(d->socket, d->messageId, commandId, fbb);
148 } else { 159 } else {
149 d->commandQueue << new QueuedCommand(commandId, fbb); 160 d->commandQueue << new QueuedCommand(commandId, fbb, callback);
150 } 161 }
151} 162}
152 163
153void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler) 164void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler)
154{ 165{
155 sendCommand(Commands::SynchronizeCommand); 166 sendCommand(Commands::SynchronizeCommand, resultHandler);
156 //TODO: this should be implemented as a job, so we don't need to store the result handler as member
157 d->synchronizeResultHandler << resultHandler;
158} 167}
159 168
160void ResourceAccess::open() 169void ResourceAccess::open()
@@ -200,7 +209,12 @@ void ResourceAccess::connected()
200 //TODO: serialize instead of blast them all through the socket? 209 //TODO: serialize instead of blast them all through the socket?
201 log(QString("We have %1 queued commands").arg(d->commandQueue.size())); 210 log(QString("We have %1 queued commands").arg(d->commandQueue.size()));
202 for (QueuedCommand *command: d->commandQueue) { 211 for (QueuedCommand *command: d->commandQueue) {
203 command->write(d->socket, ++d->messageId); 212 d->messageId++;
213 log(QString("Sending command %1").arg(command->commandId));
214 if (command->callback) {
215 registerCallback(d->messageId, command->callback);
216 }
217 Commands::write(d->socket, d->messageId, command->commandId, command->buffer, command->bufferSize);
204 delete command; 218 delete command;
205 } 219 }
206 d->commandQueue.clear(); 220 d->commandQueue.clear();
@@ -234,6 +248,8 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
234 if (!d->tryOpenTimer->isActive()) { 248 if (!d->tryOpenTimer->isActive()) {
235 d->tryOpenTimer->start(); 249 d->tryOpenTimer->start();
236 } 250 }
251 } else {
252 qWarning() << "Failed to start resource";
237 } 253 }
238} 254}
239 255
@@ -256,8 +272,7 @@ bool ResourceAccess::processMessageBuffer()
256 return false; 272 return false;
257 } 273 }
258 274
259 //messageId is unused, so commented out 275 const uint messageId = *(int*)(d->partialMessageBuffer.constData());
260 //const uint messageId = *(int*)(d->partialMessageBuffer.constData());
261 const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); 276 const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint));
262 const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); 277 const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint));
263 278
@@ -271,18 +286,15 @@ bool ResourceAccess::processMessageBuffer()
271 log(QString("Revision updated to: %1").arg(buffer->revision())); 286 log(QString("Revision updated to: %1").arg(buffer->revision()));
272 emit revisionChanged(buffer->revision()); 287 emit revisionChanged(buffer->revision());
273 288
274 //FIXME: The result handler should be called on completion of the synchronize command, and not upon arbitrary revision updates.
275 for(auto handler : d->synchronizeResultHandler) {
276 //FIXME: we should associate the handler with a buffer->id() to avoid prematurely triggering the result handler from a delayed synchronized response (this is relevant for on-demand syncing).
277 handler();
278 }
279 d->synchronizeResultHandler.clear();
280 break; 289 break;
281 } 290 }
282 case Commands::CommandCompletion: { 291 case Commands::CommandCompletion: {
283 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 292 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
284 log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); 293 log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"));
285 //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc 294 //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc
295
296 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
297 QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id()));
286 break; 298 break;
287 } 299 }
288 default: 300 default:
@@ -293,6 +305,14 @@ bool ResourceAccess::processMessageBuffer()
293 return d->partialMessageBuffer.size() >= headerSize; 305 return d->partialMessageBuffer.size() >= headerSize;
294} 306}
295 307
308void ResourceAccess::callCallbacks(int id)
309{
310 for(auto handler : d->resultHandler.values(id)) {
311 handler();
312 }
313 d->resultHandler.remove(id);
314}
315
296void ResourceAccess::log(const QString &message) 316void ResourceAccess::log(const QString &message)
297{ 317{
298 qDebug() << d->resourceName + ": " + message; 318 qDebug() << d->resourceName + ": " + message;
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 7416b25..d79c993 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -40,8 +40,9 @@ public:
40 QString resourceName() const; 40 QString resourceName() const;
41 bool isReady() const; 41 bool isReady() const;
42 42
43 void sendCommand(int commandId); 43 //TODO use jobs
44 void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); 44 void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>());
45 void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback);
45 void synchronizeResource(const std::function<void()> &resultHandler); 46 void synchronizeResource(const std::function<void()> &resultHandler);
46 47
47public Q_SLOTS: 48public Q_SLOTS:
@@ -51,6 +52,7 @@ public Q_SLOTS:
51Q_SIGNALS: 52Q_SIGNALS:
52 void ready(bool isReady); 53 void ready(bool isReady);
53 void revisionChanged(unsigned long long revision); 54 void revisionChanged(unsigned long long revision);
55 void commandCompleted();
54 56
55private Q_SLOTS: 57private Q_SLOTS:
56 //TODO: move these to the Private class 58 //TODO: move these to the Private class
@@ -59,9 +61,11 @@ private Q_SLOTS:
59 void connectionError(QLocalSocket::LocalSocketError error); 61 void connectionError(QLocalSocket::LocalSocketError error);
60 void readResourceMessage(); 62 void readResourceMessage();
61 bool processMessageBuffer(); 63 bool processMessageBuffer();
64 void callCallbacks(int id);
62 65
63private: 66private:
64 void log(const QString &message); 67 void log(const QString &message);
68 void registerCallback(uint messageId, const std::function<void()> &callback);
65 69
66 class Private; 70 class Private;
67 Private * const d; 71 Private * const d;