summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--async/src/CMakeLists.txt7
-rw-r--r--common/CMakeLists.txt7
-rw-r--r--common/entity.fbs (renamed from common/entitybuffer.fbs)4
-rw-r--r--common/entitybuffer.cpp53
-rw-r--r--common/entitybuffer.h22
-rw-r--r--common/pipeline.cpp29
-rw-r--r--common/pipeline.h1
-rw-r--r--common/resource.cpp6
-rw-r--r--common/resource.h3
-rw-r--r--common/resourceaccess.cpp96
-rw-r--r--common/resourceaccess.h8
-rw-r--r--dummyresource/facade.cpp48
-rw-r--r--dummyresource/resourcefactory.cpp94
-rw-r--r--dummyresource/resourcefactory.h3
-rw-r--r--synchronizer/listener.cpp155
-rw-r--r--synchronizer/listener.h6
-rw-r--r--tests/dummyresourcefacadetest.cpp15
-rw-r--r--tests/dummyresourcetest.cpp15
18 files changed, 371 insertions, 201 deletions
diff --git a/async/src/CMakeLists.txt b/async/src/CMakeLists.txt
index a371da0..85700ed 100644
--- a/async/src/CMakeLists.txt
+++ b/async/src/CMakeLists.txt
@@ -1,3 +1,5 @@
1project(akonadi2async)
2
1include_directories(${CMAKE_CURRENT_BINARY_DIR}) 3include_directories(${CMAKE_CURRENT_BINARY_DIR})
2 4
3set(async_SRCS 5set(async_SRCS
@@ -5,5 +7,6 @@ set(async_SRCS
5 future.cpp 7 future.cpp
6) 8)
7 9
8add_library(akonadi2async SHARED ${async_SRCS}) 10add_library(${PROJECT_NAME} SHARED ${async_SRCS})
9target_link_libraries(akonadi2async Qt5::Core) 11target_link_libraries(${PROJECT_NAME} Qt5::Core)
12install(TARGETS ${PROJECT_NAME} DESTINATION lib)
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index ec13e07..1a9a812 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -8,7 +8,7 @@ generate_flatbuffers(
8 commands/modifyentity 8 commands/modifyentity
9 commands/revisionupdate 9 commands/revisionupdate
10 domain/event 10 domain/event
11 entitybuffer 11 entity
12 metadata 12 metadata
13) 13)
14 14
@@ -17,10 +17,11 @@ if (STORAGE_unqlite)
17 set(storage_SRCS unqlite/unqlite.c storage_unqlite.cpp) 17 set(storage_SRCS unqlite/unqlite.c storage_unqlite.cpp)
18else (STORAGE_unqlite) 18else (STORAGE_unqlite)
19 set(storage_SRCS storage_lmdb.cpp) 19 set(storage_SRCS storage_lmdb.cpp)
20 set(storage_LIBS lmdb) 20 set(storage_LIBS ${lmdb})
21endif (STORAGE_unqlite) 21endif (STORAGE_unqlite)
22 22
23set(command_SRCS 23set(command_SRCS
24 entitybuffer.cpp
24 clientapi.cpp 25 clientapi.cpp
25 commands.cpp 26 commands.cpp
26 console.cpp 27 console.cpp
@@ -35,7 +36,7 @@ add_library(${PROJECT_NAME} SHARED ${command_SRCS})
35generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) 36generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h)
36SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) 37SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX)
37qt5_use_modules(${PROJECT_NAME} Widgets Network) 38qt5_use_modules(${PROJECT_NAME} Widgets Network)
38target_link_libraries(${PROJECT_NAME} ${storage_LIBS}) 39target_link_libraries(${PROJECT_NAME} ${storage_LIBS} akonadi2async)
39install(TARGETS ${PROJECT_NAME} DESTINATION lib) 40install(TARGETS ${PROJECT_NAME} DESTINATION lib)
40 41
41add_subdirectory(test) 42add_subdirectory(test)
diff --git a/common/entitybuffer.fbs b/common/entity.fbs
index 28c9b2a..565b1a7 100644
--- a/common/entitybuffer.fbs
+++ b/common/entity.fbs
@@ -1,9 +1,9 @@
1namespace Akonadi2; 1namespace Akonadi2;
2 2
3table EntityBuffer { 3table Entity {
4 metadata: [ubyte]; 4 metadata: [ubyte];
5 resource: [ubyte]; 5 resource: [ubyte];
6 local: [ubyte]; 6 local: [ubyte];
7} 7}
8 8
9root_type EntityBuffer; 9root_type Entity;
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp
new file mode 100644
index 0000000..a78e91d
--- /dev/null
+++ b/common/entitybuffer.cpp
@@ -0,0 +1,53 @@
1#include "entitybuffer.h"
2
3#include "entity_generated.h"
4#include "metadata_generated.h"
5#include <QDebug>
6
7using namespace Akonadi2;
8
9EntityBuffer::EntityBuffer(void *dataValue, int dataSize)
10 : mEntity(nullptr)
11{
12 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(dataValue), dataSize);
13 // Q_ASSERT(Akonadi2::VerifyEntity(verifyer));
14 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
15 qWarning() << "invalid buffer";
16 } else {
17 mEntity = Akonadi2::GetEntity(dataValue);
18 }
19}
20
21const flatbuffers::Vector<uint8_t>* EntityBuffer::resourceBuffer()
22{
23 if (!mEntity) {
24 qDebug() << "no buffer";
25 return nullptr;
26 }
27 return mEntity->resource();
28}
29
30const flatbuffers::Vector<uint8_t>* EntityBuffer::metadataBuffer()
31{
32 if (!mEntity) {
33 return nullptr;
34 }
35 return mEntity->metadata();
36}
37
38const flatbuffers::Vector<uint8_t>* EntityBuffer::localBuffer()
39{
40 if (!mEntity) {
41 return nullptr;
42 }
43 return mEntity->local();
44}
45
46void EntityBuffer::extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const flatbuffers::Vector<uint8_t> *)> &handler)
47{
48 Akonadi2::EntityBuffer buffer(dataValue, dataSize);
49 if (auto resourceData = buffer.resourceBuffer()) {
50 handler(resourceData);
51 }
52}
53
diff --git a/common/entitybuffer.h b/common/entitybuffer.h
new file mode 100644
index 0000000..2a7150e
--- /dev/null
+++ b/common/entitybuffer.h
@@ -0,0 +1,22 @@
1#pragma once
2
3#include <functional>
4#include <flatbuffers/flatbuffers.h>
5
6namespace Akonadi2 {
7class Entity;
8
9class EntityBuffer {
10public:
11 EntityBuffer(void *dataValue, int size);
12 const flatbuffers::Vector<uint8_t> *resourceBuffer();
13 const flatbuffers::Vector<uint8_t> *metadataBuffer();
14 const flatbuffers::Vector<uint8_t> *localBuffer();
15
16 static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const flatbuffers::Vector<uint8_t> *)> &handler);
17private:
18 const Entity *mEntity;
19};
20
21}
22
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 5ca8b95..dc6d389 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -24,7 +24,7 @@
24#include <QStandardPaths> 24#include <QStandardPaths>
25#include <QVector> 25#include <QVector>
26#include <QDebug> 26#include <QDebug>
27#include "entitybuffer_generated.h" 27#include "entity_generated.h"
28#include "metadata_generated.h" 28#include "metadata_generated.h"
29 29
30namespace Akonadi2 30namespace Akonadi2
@@ -76,33 +76,31 @@ void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t
76{ 76{
77 const qint64 newRevision = storage().maxRevision() + 1; 77 const qint64 newRevision = storage().maxRevision() + 1;
78 78
79 flatbuffers::FlatBufferBuilder fbb;
80 auto builder = Akonadi2::EntityBufferBuilder(fbb);
81 79
80 std::vector<uint8_t> metadataData;
82 //Add metadata buffer 81 //Add metadata buffer
83 { 82 {
84 flatbuffers::FlatBufferBuilder metadataFbb; 83 flatbuffers::FlatBufferBuilder metadataFbb;
85 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 84 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
86 metadataBuilder.add_revision(newRevision); 85 metadataBuilder.add_revision(newRevision);
87 auto metadataBuffer = metadataBuilder.Finish(); 86 auto metadataBuffer = metadataBuilder.Finish();
88 Akonadi2::FinishMetadataBuffer(fbb, metadataBuffer); 87 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
89 //TODO use memcpy 88 metadataData.resize(metadataFbb.GetSize());
90 auto metadata = fbb.CreateVector<uint8_t>(metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 89 std::copy_n(metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), back_inserter(metadataData));
91 builder.add_metadata(metadata);
92 } 90 }
93 91
94 //Add resource buffer
95 {
96 //TODO use memcpy
97 auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(resourceBufferData), size);
98 builder.add_resource(resource);
99 }
100 92
93 flatbuffers::FlatBufferBuilder fbb;
94 auto metadata = fbb.CreateVector<uint8_t>(metadataData.data(), metadataData.size());
95 auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(resourceBufferData), size);
96 auto builder = Akonadi2::EntityBuilder(fbb);
97 builder.add_metadata(metadata);
98 builder.add_resource(resource);
101 //We don't have a local buffer yet 99 //We don't have a local buffer yet
102 // builder.add_local(); 100 // builder.add_local();
103 101
104 auto buffer = builder.Finish(); 102 auto buffer = builder.Finish();
105 Akonadi2::FinishEntityBufferBuffer(fbb, buffer); 103 Akonadi2::FinishEntityBuffer(fbb, buffer);
106 104
107 qDebug() << "writing new entity" << key; 105 qDebug() << "writing new entity" << key;
108 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); 106 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
@@ -163,6 +161,9 @@ void Pipeline::pipelineCompleted(const PipelineState &state)
163 emit revisionUpdated(); 161 emit revisionUpdated();
164 } 162 }
165 scheduleStep(); 163 scheduleStep();
164 if (d->activePipelines.isEmpty()) {
165 emit pipelinesDrained();
166 }
166} 167}
167 168
168 169
diff --git a/common/pipeline.h b/common/pipeline.h
index 159cc1c..6ef8703 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -55,6 +55,7 @@ public:
55 55
56Q_SIGNALS: 56Q_SIGNALS:
57 void revisionUpdated(); 57 void revisionUpdated();
58 void pipelinesDrained();
58 59
59private Q_SLOTS: 60private Q_SLOTS:
60 void stepPipelines(); 61 void stepPipelines();
diff --git a/common/resource.cpp b/common/resource.cpp
index ae28485..bba6609 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -48,9 +48,11 @@ void Resource::processCommand(int commandId, const QByteArray &data, uint size,
48 pipeline->null(); 48 pipeline->null();
49} 49}
50 50
51void Resource::synchronizeWithSource(Pipeline *pipeline) 51Async::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline)
52{ 52{
53 pipeline->null(); 53 return Async::start<void>([pipeline](Async::Future<void> &f) {
54 pipeline->null();
55 });
54} 56}
55 57
56class ResourceFactory::Private 58class ResourceFactory::Private
diff --git a/common/resource.h b/common/resource.h
index 0f65e1f..fb42c1b 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -21,6 +21,7 @@
21#include <akonadi2common_export.h> 21#include <akonadi2common_export.h>
22#include <clientapi.h> 22#include <clientapi.h>
23#include <pipeline.h> 23#include <pipeline.h>
24#include <async/src/async.h>
24 25
25namespace Akonadi2 26namespace Akonadi2
26{ 27{
@@ -33,7 +34,7 @@ public:
33 virtual ~Resource(); 34 virtual ~Resource();
34 35
35 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); 36 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline);
36 virtual void synchronizeWithSource(Pipeline *pipeline); 37 virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline);
37 38
38private: 39private:
39 class Private; 40 class Private;
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 1706ac4..31b9e79 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -37,38 +37,36 @@ namespace Akonadi2
37class QueuedCommand 37class QueuedCommand
38{ 38{
39public: 39public:
40 QueuedCommand(int commandId) 40 QueuedCommand(int commandId, const std::function<void()> &callback)
41 : m_commandId(commandId), 41 : commandId(commandId),
42 m_bufferSize(0), 42 bufferSize(0),
43 m_buffer(0) 43 buffer(0),
44 callback(callback)
44 {} 45 {}
45 46
46 QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) 47 QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback)
47 : m_commandId(commandId), 48 : commandId(commandId),
48 m_bufferSize(fbb.GetSize()), 49 bufferSize(fbb.GetSize()),
49 m_buffer(new char[m_bufferSize]) 50 buffer(new char[bufferSize]),
51 callback(callback)
50 { 52 {
51 memcpy(m_buffer, fbb.GetBufferPointer(), m_bufferSize); 53 memcpy(buffer, fbb.GetBufferPointer(), bufferSize);
52 } 54 }
53 55
54 ~QueuedCommand() 56 ~QueuedCommand()
55 { 57 {
56 delete[] m_buffer; 58 delete[] buffer;
57 }
58
59 void write(QIODevice *device, uint messageId)
60 {
61 // Console::main()->log(QString("\tSending queued command %1").arg(m_commandId));
62 Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize);
63 } 59 }
64 60
65private: 61private:
66 QueuedCommand(const QueuedCommand &other); 62 QueuedCommand(const QueuedCommand &other);
67 QueuedCommand &operator=(const QueuedCommand &rhs); 63 QueuedCommand &operator=(const QueuedCommand &rhs);
68 64
69 const int m_commandId; 65public:
70 const uint m_bufferSize; 66 const int commandId;
71 char *m_buffer; 67 const uint bufferSize;
68 char *buffer;
69 std::function<void()> callback;
72}; 70};
73 71
74class ResourceAccess::Private 72class ResourceAccess::Private
@@ -82,7 +80,7 @@ public:
82 QByteArray partialMessageBuffer; 80 QByteArray partialMessageBuffer;
83 flatbuffers::FlatBufferBuilder fbb; 81 flatbuffers::FlatBufferBuilder fbb;
84 QVector<QueuedCommand *> commandQueue; 82 QVector<QueuedCommand *> commandQueue;
85 QVector<std::function<void()> > synchronizeResultHandler; 83 QMultiMap<uint, std::function<void()> > resultHandler;
86 uint messageId; 84 uint messageId;
87}; 85};
88 86
@@ -130,31 +128,42 @@ bool ResourceAccess::isReady() const
130 return d->socket->isValid(); 128 return d->socket->isValid();
131} 129}
132 130
133void ResourceAccess::sendCommand(int commandId) 131void ResourceAccess::registerCallback(uint messageId, const std::function<void()> &callback)
132{
133 d->resultHandler.insert(messageId, callback);
134}
135
136void ResourceAccess::sendCommand(int commandId, const std::function<void()> &callback)
134{ 137{
135 if (isReady()) { 138 if (isReady()) {
136 log(QString("Sending command %1").arg(commandId)); 139 log(QString("Sending command %1").arg(commandId));
137 Commands::write(d->socket, ++d->messageId, commandId); 140 d->messageId++;
141 if (callback) {
142 registerCallback(d->messageId, callback);
143 }
144 Commands::write(d->socket, d->messageId, commandId);
138 } else { 145 } else {
139 d->commandQueue << new QueuedCommand(commandId); 146 d->commandQueue << new QueuedCommand(commandId, callback);
140 } 147 }
141} 148}
142 149
143void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) 150void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback)
144{ 151{
145 if (isReady()) { 152 if (isReady()) {
146 log(QString("Sending command %1").arg(commandId)); 153 log(QString("Sending command %1").arg(commandId));
147 Commands::write(d->socket, ++d->messageId, commandId, fbb); 154 d->messageId++;
155 if (callback) {
156 registerCallback(d->messageId, callback);
157 }
158 Commands::write(d->socket, d->messageId, commandId, fbb);
148 } else { 159 } else {
149 d->commandQueue << new QueuedCommand(commandId, fbb); 160 d->commandQueue << new QueuedCommand(commandId, fbb, callback);
150 } 161 }
151} 162}
152 163
153void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler) 164void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler)
154{ 165{
155 sendCommand(Commands::SynchronizeCommand); 166 sendCommand(Commands::SynchronizeCommand, resultHandler);
156 //TODO: this should be implemented as a job, so we don't need to store the result handler as member
157 d->synchronizeResultHandler << resultHandler;
158} 167}
159 168
160void ResourceAccess::open() 169void ResourceAccess::open()
@@ -200,7 +209,12 @@ void ResourceAccess::connected()
200 //TODO: serialize instead of blast them all through the socket? 209 //TODO: serialize instead of blast them all through the socket?
201 log(QString("We have %1 queued commands").arg(d->commandQueue.size())); 210 log(QString("We have %1 queued commands").arg(d->commandQueue.size()));
202 for (QueuedCommand *command: d->commandQueue) { 211 for (QueuedCommand *command: d->commandQueue) {
203 command->write(d->socket, ++d->messageId); 212 d->messageId++;
213 log(QString("Sending command %1").arg(command->commandId));
214 if (command->callback) {
215 registerCallback(d->messageId, command->callback);
216 }
217 Commands::write(d->socket, d->messageId, command->commandId, command->buffer, command->bufferSize);
204 delete command; 218 delete command;
205 } 219 }
206 d->commandQueue.clear(); 220 d->commandQueue.clear();
@@ -234,6 +248,8 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
234 if (!d->tryOpenTimer->isActive()) { 248 if (!d->tryOpenTimer->isActive()) {
235 d->tryOpenTimer->start(); 249 d->tryOpenTimer->start();
236 } 250 }
251 } else {
252 qWarning() << "Failed to start resource";
237 } 253 }
238} 254}
239 255
@@ -256,8 +272,7 @@ bool ResourceAccess::processMessageBuffer()
256 return false; 272 return false;
257 } 273 }
258 274
259 //messageId is unused, so commented out 275 const uint messageId = *(int*)(d->partialMessageBuffer.constData());
260 //const uint messageId = *(int*)(d->partialMessageBuffer.constData());
261 const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); 276 const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint));
262 const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); 277 const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint));
263 278
@@ -271,18 +286,15 @@ bool ResourceAccess::processMessageBuffer()
271 log(QString("Revision updated to: %1").arg(buffer->revision())); 286 log(QString("Revision updated to: %1").arg(buffer->revision()));
272 emit revisionChanged(buffer->revision()); 287 emit revisionChanged(buffer->revision());
273 288
274 //FIXME: The result handler should be called on completion of the synchronize command, and not upon arbitrary revision updates.
275 for(auto handler : d->synchronizeResultHandler) {
276 //FIXME: we should associate the handler with a buffer->id() to avoid prematurely triggering the result handler from a delayed synchronized response (this is relevant for on-demand syncing).
277 handler();
278 }
279 d->synchronizeResultHandler.clear();
280 break; 289 break;
281 } 290 }
282 case Commands::CommandCompletion: { 291 case Commands::CommandCompletion: {
283 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 292 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
284 log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); 293 log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"));
285 //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc 294 //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc
295
296 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
297 QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id()));
286 break; 298 break;
287 } 299 }
288 default: 300 default:
@@ -293,6 +305,14 @@ bool ResourceAccess::processMessageBuffer()
293 return d->partialMessageBuffer.size() >= headerSize; 305 return d->partialMessageBuffer.size() >= headerSize;
294} 306}
295 307
308void ResourceAccess::callCallbacks(int id)
309{
310 for(auto handler : d->resultHandler.values(id)) {
311 handler();
312 }
313 d->resultHandler.remove(id);
314}
315
296void ResourceAccess::log(const QString &message) 316void ResourceAccess::log(const QString &message)
297{ 317{
298 qDebug() << d->resourceName + ": " + message; 318 qDebug() << d->resourceName + ": " + message;
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 7416b25..d79c993 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -40,8 +40,9 @@ public:
40 QString resourceName() const; 40 QString resourceName() const;
41 bool isReady() const; 41 bool isReady() const;
42 42
43 void sendCommand(int commandId); 43 //TODO use jobs
44 void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); 44 void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>());
45 void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback);
45 void synchronizeResource(const std::function<void()> &resultHandler); 46 void synchronizeResource(const std::function<void()> &resultHandler);
46 47
47public Q_SLOTS: 48public Q_SLOTS:
@@ -51,6 +52,7 @@ public Q_SLOTS:
51Q_SIGNALS: 52Q_SIGNALS:
52 void ready(bool isReady); 53 void ready(bool isReady);
53 void revisionChanged(unsigned long long revision); 54 void revisionChanged(unsigned long long revision);
55 void commandCompleted();
54 56
55private Q_SLOTS: 57private Q_SLOTS:
56 //TODO: move these to the Private class 58 //TODO: move these to the Private class
@@ -59,9 +61,11 @@ private Q_SLOTS:
59 void connectionError(QLocalSocket::LocalSocketError error); 61 void connectionError(QLocalSocket::LocalSocketError error);
60 void readResourceMessage(); 62 void readResourceMessage();
61 bool processMessageBuffer(); 63 bool processMessageBuffer();
64 void callCallbacks(int id);
62 65
63private: 66private:
64 void log(const QString &message); 67 void log(const QString &message);
68 void registerCallback(uint messageId, const std::function<void()> &callback);
65 69
66 class Private; 70 class Private;
67 Private * const d; 71 Private * const d;
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp
index 458aba6..d3974e9 100644
--- a/dummyresource/facade.cpp
+++ b/dummyresource/facade.cpp
@@ -26,8 +26,9 @@
26#include "common/commands.h" 26#include "common/commands.h"
27#include "dummycalendar_generated.h" 27#include "dummycalendar_generated.h"
28#include "event_generated.h" 28#include "event_generated.h"
29#include "entitybuffer_generated.h" 29#include "entity_generated.h"
30#include "metadata_generated.h" 30#include "metadata_generated.h"
31#include <common/entitybuffer.h>
31 32
32using namespace DummyCalendar; 33using namespace DummyCalendar;
33using namespace flatbuffers; 34using namespace flatbuffers;
@@ -199,14 +200,47 @@ void DummyResourceFacade::load(const Akonadi2::Query &query, const std::function
199 storage->startTransaction(Akonadi2::Storage::ReadOnly); 200 storage->startTransaction(Akonadi2::Storage::ReadOnly);
200 //Because we have no indexes yet, we always do a full scan 201 //Because we have no indexes yet, we always do a full scan
201 storage->scan("", [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { 202 storage->scan("", [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool {
202 qDebug() << QString::fromStdString(std::string(static_cast<char*>(keyValue), keySize)); 203
203 auto buffer = Akonadi2::GetEntityBuffer(dataValue); 204 //Skip internals
204 auto resourceBuffer = GetDummyEvent(buffer->resource()); 205 if (QByteArray::fromRawData(static_cast<char*>(keyValue), keySize).startsWith("__internal")) {
205 auto metadataBuffer = Akonadi2::GetMetadata(buffer->resource()); 206 return true;
206 auto localBuffer = Akonadi2::Domain::Buffer::GetEvent(buffer->local()); 207 }
208
209 //Extract buffers
210 Akonadi2::EntityBuffer buffer(dataValue, dataSize);
211
212 DummyEvent const *resourceBuffer = 0;
213 if (auto resourceData = buffer.resourceBuffer()) {
214 flatbuffers::Verifier verifyer(resourceData->Data(), resourceData->size());
215 if (VerifyDummyEventBuffer(verifyer)) {
216 resourceBuffer = GetDummyEvent(resourceData);
217 }
218 }
219
220 Akonadi2::Metadata const *metadataBuffer = 0;
221 if (auto metadataData = buffer.metadataBuffer()) {
222 flatbuffers::Verifier verifyer(metadataData->Data(), metadataData->size());
223 if (Akonadi2::VerifyMetadataBuffer(verifyer)) {
224 metadataBuffer = Akonadi2::GetMetadata(metadataData);
225 }
226 }
227
228 Akonadi2::Domain::Buffer::Event const *localBuffer = 0;
229 if (auto localData = buffer.localBuffer()) {
230 flatbuffers::Verifier verifyer(localData->Data(), localData->size());
231 if (Akonadi2::Domain::Buffer::VerifyEventBuffer(verifyer)) {
232 localBuffer = Akonadi2::Domain::Buffer::GetEvent(localData);
233 }
234 }
235
236 if (!resourceBuffer || !metadataBuffer) {
237 qWarning() << "invalid buffer " << QString::fromStdString(std::string(static_cast<char*>(keyValue), keySize));
238 return true;
239 }
240
207 //We probably only want to create all buffers after the scan 241 //We probably only want to create all buffers after the scan
208 if (preparedQuery && preparedQuery(std::string(static_cast<char*>(keyValue), keySize), resourceBuffer)) { 242 if (preparedQuery && preparedQuery(std::string(static_cast<char*>(keyValue), keySize), resourceBuffer)) {
209 qint64 revision = metadataBuffer->revision(); 243 qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
210 auto adaptor = QSharedPointer<DummyEventAdaptor>::create(); 244 auto adaptor = QSharedPointer<DummyEventAdaptor>::create();
211 adaptor->mLocalBuffer = localBuffer; 245 adaptor->mLocalBuffer = localBuffer;
212 adaptor->mResourceBuffer = resourceBuffer; 246 adaptor->mResourceBuffer = resourceBuffer;
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index 6b93985..c9e4d7a 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -19,7 +19,9 @@
19 19
20#include "resourcefactory.h" 20#include "resourcefactory.h"
21#include "facade.h" 21#include "facade.h"
22#include "entitybuffer.h"
22#include "dummycalendar_generated.h" 23#include "dummycalendar_generated.h"
24#include "metadata_generated.h"
23#include <QUuid> 25#include <QUuid>
24 26
25static std::string createEvent() 27static std::string createEvent()
@@ -64,51 +66,67 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri
64 //TODO lookup in rid index instead of doing a full scan 66 //TODO lookup in rid index instead of doing a full scan
65 const std::string ridString = rid.toStdString(); 67 const std::string ridString = rid.toStdString();
66 storage->scan("", [&](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { 68 storage->scan("", [&](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool {
67 auto eventBuffer = DummyCalendar::GetDummyEvent(dataValue); 69 if (QByteArray::fromRawData(static_cast<char*>(keyValue), keySize).startsWith("__internal")) {
68 if (std::string(eventBuffer->remoteId()->c_str(), eventBuffer->remoteId()->size()) == ridString) { 70 return true;
69 callback(keyValue, keySize, dataValue, dataSize);
70 } 71 }
72
73 Akonadi2::EntityBuffer::extractResourceBuffer(dataValue, dataSize, [&](const flatbuffers::Vector<uint8_t> *buffer) {
74 flatbuffers::Verifier verifier(buffer->Data(), buffer->size());
75 if (DummyCalendar::VerifyDummyEventBuffer(verifier)) {
76 DummyCalendar::DummyEvent const *resourceBuffer = DummyCalendar::GetDummyEvent(buffer->Data());
77 if (resourceBuffer && resourceBuffer->remoteId()) {
78 if (std::string(resourceBuffer->remoteId()->c_str(), resourceBuffer->remoteId()->size()) == ridString) {
79 callback(keyValue, keySize, dataValue, dataSize);
80 }
81 }
82 }
83 });
71 return true; 84 return true;
72 }); 85 });
73} 86}
74 87
75void DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) 88Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline)
76{ 89{
77 //TODO use a read-only transaction during the complete sync to sync against a defined revision 90 return Async::start<void>([this, pipeline](Async::Future<void> &f) {
78 91 //TODO use a read-only transaction during the complete sync to sync against a defined revision
79 qDebug() << "synchronize with source"; 92 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy");
80 93 for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) {
81 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); 94 bool isNew = true;
82 for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) { 95 if (storage->exists()) {
83 bool isNew = true; 96 findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) {
84 if (storage->exists()) { 97 isNew = false;
85 findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) { 98 });
86 isNew = false; 99 }
87 }); 100 if (isNew) {
88 } 101 m_fbb.Clear();
89 102
90 if (isNew) { 103 const QByteArray data = it.value().toUtf8();
91 //TODO: perhaps it would be more convenient to populate the domain types? 104 auto eventBuffer = DummyCalendar::GetDummyEvent(data.data());
92 //Resource specific parts are not accessible that way, but then we would only have to implement the property mapping in one place 105
93 const QByteArray data = it.value().toUtf8(); 106 //Map the source format to the buffer format (which happens to be an exact copy here)
94 auto eventBuffer = DummyCalendar::GetDummyEvent(data.data()); 107 auto summary = m_fbb.CreateString(eventBuffer->summary()->c_str());
95 108 auto rid = m_fbb.CreateString(it.key().toStdString().c_str());
96 //Map the source format to the buffer format (which happens to be an exact copy here) 109 auto description = m_fbb.CreateString(it.key().toStdString().c_str());
97 auto builder = DummyCalendar::DummyEventBuilder(m_fbb); 110 static uint8_t rawData[100];
98 builder.add_summary(m_fbb.CreateString(eventBuffer->summary()->c_str())); 111 auto attachment = m_fbb.CreateVector(rawData, 100);
99 auto buffer = builder.Finish(); 112
100 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); 113 auto builder = DummyCalendar::DummyEventBuilder(m_fbb);
101 114 builder.add_summary(summary);
102 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. 115 builder.add_remoteId(rid);
103 const auto key = QUuid::createUuid().toString().toUtf8(); 116 builder.add_description(description);
104 //TODO can we really just start populating the buffer and pass the buffer builder? 117 builder.add_attachment(attachment);
105 qDebug() << "new event"; 118 auto buffer = builder.Finish();
106 pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); 119 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
107 } else { //modification 120 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
108 //TODO diff and create modification if necessary 121 const auto key = QUuid::createUuid().toString().toUtf8();
122 pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize());
123 } else { //modification
124 //TODO diff and create modification if necessary
125 }
109 } 126 }
110 } 127 //TODO find items to remove
111 //TODO find items to remove 128 f.setFinished();
129 });
112} 130}
113 131
114void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) 132void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline)
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h
index 807a654..dba674f 100644
--- a/dummyresource/resourcefactory.h
+++ b/dummyresource/resourcefactory.h
@@ -20,6 +20,7 @@
20#pragma once 20#pragma once
21 21
22#include "common/resource.h" 22#include "common/resource.h"
23#include "async/src/async.h"
23 24
24#include <flatbuffers/flatbuffers.h> 25#include <flatbuffers/flatbuffers.h>
25 26
@@ -30,7 +31,7 @@ class DummyResource : public Akonadi2::Resource
30{ 31{
31public: 32public:
32 DummyResource(); 33 DummyResource();
33 void synchronizeWithSource(Akonadi2::Pipeline *pipeline); 34 Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline);
34 void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); 35 void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline);
35 36
36private: 37private:
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 328d4d6..8e94213 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -35,7 +35,6 @@
35Listener::Listener(const QString &resourceName, QObject *parent) 35Listener::Listener(const QString &resourceName, QObject *parent)
36 : QObject(parent), 36 : QObject(parent),
37 m_server(new QLocalServer(this)), 37 m_server(new QLocalServer(this)),
38 m_revision(0),
39 m_resourceName(resourceName), 38 m_resourceName(resourceName),
40 m_resource(0), 39 m_resource(0),
41 m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), 40 m_pipeline(new Akonadi2::Pipeline(resourceName, parent)),
@@ -46,18 +45,18 @@ Listener::Listener(const QString &resourceName, QObject *parent)
46 this, &Listener::refreshRevision); 45 this, &Listener::refreshRevision);
47 connect(m_server, &QLocalServer::newConnection, 46 connect(m_server, &QLocalServer::newConnection,
48 this, &Listener::acceptConnection); 47 this, &Listener::acceptConnection);
49 Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); 48 log(QString("Trying to open %1").arg(resourceName));
50 if (!m_server->listen(resourceName)) { 49 if (!m_server->listen(resourceName)) {
51 // FIXME: multiple starts need to be handled here 50 // FIXME: multiple starts need to be handled here
52 m_server->removeServer(resourceName); 51 m_server->removeServer(resourceName);
53 if (!m_server->listen(resourceName)) { 52 if (!m_server->listen(resourceName)) {
54 Akonadi2::Console::main()->log("Utter failure to start server"); 53 log("Utter failure to start server");
55 exit(-1); 54 exit(-1);
56 } 55 }
57 } 56 }
58 57
59 if (m_server->isListening()) { 58 if (m_server->isListening()) {
60 Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); 59 log(QString("Listening on %1").arg(m_server->serverName()));
61 } 60 }
62 61
63 //TODO: experiment with different timeouts 62 //TODO: experiment with different timeouts
@@ -73,19 +72,6 @@ Listener::~Listener()
73{ 72{
74} 73}
75 74
76void Listener::setRevision(unsigned long long revision)
77{
78 if (m_revision != revision) {
79 m_revision = revision;
80 updateClientsWithRevision();
81 }
82}
83
84unsigned long long Listener::revision() const
85{
86 return m_revision;
87}
88
89void Listener::closeAllConnections() 75void Listener::closeAllConnections()
90{ 76{
91 for (Client &client: m_connections) { 77 for (Client &client: m_connections) {
@@ -101,14 +87,14 @@ void Listener::closeAllConnections()
101 87
102void Listener::acceptConnection() 88void Listener::acceptConnection()
103{ 89{
104 Akonadi2::Console::main()->log(QString("Accepting connection")); 90 log(QString("Accepting connection"));
105 QLocalSocket *socket = m_server->nextPendingConnection(); 91 QLocalSocket *socket = m_server->nextPendingConnection();
106 92
107 if (!socket) { 93 if (!socket) {
108 return; 94 return;
109 } 95 }
110 96
111 Akonadi2::Console::main()->log("Got a connection"); 97 log("Got a connection");
112 Client client("Unknown Client", socket); 98 Client client("Unknown Client", socket);
113 connect(socket, &QIODevice::readyRead, 99 connect(socket, &QIODevice::readyRead,
114 this, &Listener::readFromSocket); 100 this, &Listener::readFromSocket);
@@ -125,12 +111,12 @@ void Listener::clientDropped()
125 return; 111 return;
126 } 112 }
127 113
128 Akonadi2::Console::main()->log("Dropping connection..."); 114 log("Dropping connection...");
129 QMutableVectorIterator<Client> it(m_connections); 115 QMutableVectorIterator<Client> it(m_connections);
130 while (it.hasNext()) { 116 while (it.hasNext()) {
131 const Client &client = it.next(); 117 const Client &client = it.next();
132 if (client.socket == socket) { 118 if (client.socket == socket) {
133 Akonadi2::Console::main()->log(QString(" dropped... %1").arg(client.name)); 119 log(QString(" dropped... %1").arg(client.name));
134 it.remove(); 120 it.remove();
135 break; 121 break;
136 } 122 }
@@ -154,7 +140,7 @@ void Listener::readFromSocket()
154 return; 140 return;
155 } 141 }
156 142
157 Akonadi2::Console::main()->log("Reading from socket..."); 143 log("Reading from socket...");
158 for (Client &client: m_connections) { 144 for (Client &client: m_connections) {
159 if (client.socket == socket) { 145 if (client.socket == socket) {
160 client.commandBuffer += socket->readAll(); 146 client.commandBuffer += socket->readAll();
@@ -188,6 +174,57 @@ void Listener::processClientBuffers()
188 } 174 }
189} 175}
190 176
177void Listener::processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback)
178{
179 switch (commandId) {
180 case Akonadi2::Commands::HandshakeCommand: {
181 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size);
182 if (Akonadi2::VerifyHandshakeBuffer(verifier)) {
183 auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData());
184 client.name = buffer->name()->c_str();
185 sendCurrentRevision(client);
186 } else {
187 qWarning() << "received invalid command";
188 }
189 break;
190 }
191 case Akonadi2::Commands::SynchronizeCommand: {
192 log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name));
193 loadResource();
194 if (m_resource) {
195 qDebug() << "synchronizing";
196 m_resource->synchronizeWithSource(m_pipeline).then<void>([callback](Async::Future<void> &f){
197 //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result
198 callback();
199 f.setFinished();
200 }).exec();
201 return;
202 } else {
203 qWarning() << "No resource loaded";
204 }
205 break;
206 }
207 case Akonadi2::Commands::FetchEntityCommand:
208 case Akonadi2::Commands::DeleteEntityCommand:
209 case Akonadi2::Commands::ModifyEntityCommand:
210 case Akonadi2::Commands::CreateEntityCommand:
211 log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
212 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
213 break;
214 default:
215 if (commandId > Akonadi2::Commands::CustomCommand) {
216 loadResource();
217 if (m_resource) {
218 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
219 }
220 } else {
221 //TODO: handle error: we don't know wtf this command is
222 }
223 break;
224 }
225 callback();
226}
227
191bool Listener::processClientBuffer(Client &client) 228bool Listener::processClientBuffer(Client &client)
192{ 229{
193 static const int headerSize = Akonadi2::Commands::headerSize(); 230 static const int headerSize = Akonadi2::Commands::headerSize();
@@ -195,58 +232,22 @@ bool Listener::processClientBuffer(Client &client)
195 return false; 232 return false;
196 } 233 }
197 234
198 int commandId; 235 const uint messageId = *(uint*)client.commandBuffer.constData();
199 uint messageId, size; 236 const int commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint));
200 messageId = *(uint*)client.commandBuffer.constData(); 237 const uint size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
201 commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint));
202 size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
203 238
204 //TODO: reject messages above a certain size? 239 //TODO: reject messages above a certain size?
205 240
206 if (size <= uint(client.commandBuffer.size() - headerSize)) { 241 if (size <= uint(client.commandBuffer.size() - headerSize)) {
207 client.commandBuffer.remove(0, headerSize); 242 client.commandBuffer.remove(0, headerSize);
208 243
209 switch (commandId) { 244 processCommand(commandId, messageId, client, size, [this, messageId, commandId, &client]() {
210 case Akonadi2::Commands::HandshakeCommand: { 245 log(QString("\tCompleted command messageid %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
211 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); 246 //FIXME, client needs to become a shared pointer and not a reference, or we have to search through m_connections everytime.
212 if (Akonadi2::VerifyHandshakeBuffer(verifier)) { 247 sendCommandCompleted(client, messageId);
213 auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); 248 });
214 client.name = buffer->name()->c_str();
215 sendCurrentRevision(client);
216 }
217 break;
218 }
219 case Akonadi2::Commands::SynchronizeCommand: {
220 Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name));
221 loadResource();
222 if (m_resource) {
223 m_resource->synchronizeWithSource(m_pipeline);
224 }
225 break;
226 }
227 case Akonadi2::Commands::FetchEntityCommand:
228 case Akonadi2::Commands::DeleteEntityCommand:
229 case Akonadi2::Commands::ModifyEntityCommand:
230 case Akonadi2::Commands::CreateEntityCommand:
231 Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
232 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
233 break;
234 default:
235 if (commandId > Akonadi2::Commands::CustomCommand) {
236 loadResource();
237 if (m_resource) {
238 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
239 }
240 } else {
241 //TODO: handle error: we don't know wtf this command is
242 }
243 break;
244 }
245
246 //TODO: async commands == async sendCommandCompleted
247 Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
248 client.commandBuffer.remove(0, size); 249 client.commandBuffer.remove(0, size);
249 sendCommandCompleted(client, messageId); 250
250 return client.commandBuffer.size() >= headerSize; 251 return client.commandBuffer.size() >= headerSize;
251 } 252 }
252 253
@@ -259,7 +260,7 @@ void Listener::sendCurrentRevision(Client &client)
259 return; 260 return;
260 } 261 }
261 262
262 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); 263 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision());
263 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); 264 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command);
264 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); 265 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb);
265 m_fbb.Clear(); 266 m_fbb.Clear();
@@ -279,14 +280,12 @@ void Listener::sendCommandCompleted(Client &client, uint messageId)
279 280
280void Listener::refreshRevision() 281void Listener::refreshRevision()
281{ 282{
282 //TODO this should be coming out of m_pipeline->storage()
283 ++m_revision;
284 updateClientsWithRevision(); 283 updateClientsWithRevision();
285} 284}
286 285
287void Listener::updateClientsWithRevision() 286void Listener::updateClientsWithRevision()
288{ 287{
289 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); 288 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision());
290 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); 289 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command);
291 290
292 for (const Client &client: m_connections) { 291 for (const Client &client: m_connections) {
@@ -308,13 +307,19 @@ void Listener::loadResource()
308 Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); 307 Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName);
309 if (resourceFactory) { 308 if (resourceFactory) {
310 m_resource = resourceFactory->createResource(); 309 m_resource = resourceFactory->createResource();
311 Akonadi2::Console::main()->log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); 310 log(QString("Resource factory: %1").arg((qlonglong)resourceFactory));
312 Akonadi2::Console::main()->log(QString("\tResource: %1").arg((qlonglong)m_resource)); 311 log(QString("\tResource: %1").arg((qlonglong)m_resource));
313 //TODO: this doesn't really list all the facades .. fix 312 //TODO: this doesn't really list all the facades .. fix
314 Akonadi2::Console::main()->log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type())); 313 log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type()));
315 } else { 314 } else {
316 Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); 315 log(QString("Failed to load resource %1").arg(m_resourceName));
317 } 316 }
318 //TODO: on failure ... what? 317 //TODO: on failure ... what?
318 //Enter broken state?
319}
320
321void Listener::log(const QString &message)
322{
323 Akonadi2::Console::main()->log("Listener: " + message);
319} 324}
320 325
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
index 357ae37..4c35191 100644
--- a/synchronizer/listener.h
+++ b/synchronizer/listener.h
@@ -61,9 +61,6 @@ public:
61 Listener(const QString &resourceName, QObject *parent = 0); 61 Listener(const QString &resourceName, QObject *parent = 0);
62 ~Listener(); 62 ~Listener();
63 63
64 void setRevision(unsigned long long revision);
65 unsigned long long revision() const;
66
67Q_SIGNALS: 64Q_SIGNALS:
68 void noClients(); 65 void noClients();
69 66
@@ -79,15 +76,16 @@ private Q_SLOTS:
79 void refreshRevision(); 76 void refreshRevision();
80 77
81private: 78private:
79 void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback);
82 bool processClientBuffer(Client &client); 80 bool processClientBuffer(Client &client);
83 void sendCurrentRevision(Client &client); 81 void sendCurrentRevision(Client &client);
84 void sendCommandCompleted(Client &client, uint messageId); 82 void sendCommandCompleted(Client &client, uint messageId);
85 void updateClientsWithRevision(); 83 void updateClientsWithRevision();
86 void loadResource(); 84 void loadResource();
85 void log(const QString &);
87 86
88 QLocalServer *m_server; 87 QLocalServer *m_server;
89 QVector<Client> m_connections; 88 QVector<Client> m_connections;
90 unsigned long long m_revision;
91 flatbuffers::FlatBufferBuilder m_fbb; 89 flatbuffers::FlatBufferBuilder m_fbb;
92 const QString m_resourceName; 90 const QString m_resourceName;
93 Akonadi2::Resource *m_resource; 91 Akonadi2::Resource *m_resource;
diff --git a/tests/dummyresourcefacadetest.cpp b/tests/dummyresourcefacadetest.cpp
index d815e9b..e4d27fc 100644
--- a/tests/dummyresourcefacadetest.cpp
+++ b/tests/dummyresourcefacadetest.cpp
@@ -51,17 +51,10 @@ private Q_SLOTS:
51 query.ids << "key50"; 51 query.ids << "key50";
52 query.resources << "dummyresource"; 52 query.resources << "dummyresource";
53 53
54 auto result = Akonadi2::Store::load<Akonadi2::Domain::Event>(query); 54 //FIXME avoid sync somehow. No synchronizer access here (perhaps configure the instance above accordingly?)
55 bool complete = false; 55 async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query));
56 QVector<Akonadi2::Domain::Event::Ptr> results; 56 result.exec();
57 result->onAdded([&results](const Akonadi2::Domain::Event::Ptr &e) { 57 QCOMPARE(result.size(), 1);
58 results << e;
59 });
60 result->onComplete([&complete]() {
61 complete = true;
62 });
63 QTRY_VERIFY(complete);
64 QCOMPARE(results.size(), 1);
65 58
66 Akonadi2::Storage storage(testDataPath, dbName); 59 Akonadi2::Storage storage(testDataPath, dbName);
67 storage.removeFromDisk(); 60 storage.removeFromDisk();
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp
index 75d29de..0c02675 100644
--- a/tests/dummyresourcetest.cpp
+++ b/tests/dummyresourcetest.cpp
@@ -2,7 +2,7 @@
2 2
3#include <QString> 3#include <QString>
4 4
5#include "common/resource.h" 5#include "dummyresource/resourcefactory.h"
6#include "clientapi.h" 6#include "clientapi.h"
7 7
8class DummyResourceTest : public QObject 8class DummyResourceTest : public QObject
@@ -13,12 +13,23 @@ private Q_SLOTS:
13 { 13 {
14 auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy"); 14 auto factory = Akonadi2::ResourceFactory::load("org.kde.dummy");
15 QVERIFY(factory); 15 QVERIFY(factory);
16 Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy", Akonadi2::Storage::ReadWrite);
17 store.removeFromDisk();
16 } 18 }
17 19
18 void cleanupTestCase() 20 void cleanupTestCase()
19 { 21 {
20 } 22 }
21 23
24 void testResource()
25 {
26 Akonadi2::Pipeline pipeline("org.kde.dummy");
27 DummyResource resource;
28 auto job = resource.synchronizeWithSource(&pipeline);
29 auto future = job.exec();
30 QTRY_VERIFY(future.isFinished());
31 }
32
22 void testSync() 33 void testSync()
23 { 34 {
24 Akonadi2::Query query; 35 Akonadi2::Query query;
@@ -27,6 +38,8 @@ private Q_SLOTS:
27 async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query)); 38 async::SyncListResult<Akonadi2::Domain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::Domain::Event>(query));
28 result.exec(); 39 result.exec();
29 QVERIFY(!result.isEmpty()); 40 QVERIFY(!result.isEmpty());
41 auto value = result.first();
42 qDebug() << value->getProperty("summary");
30 } 43 }
31 44
32}; 45};