summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2014-12-21 22:20:31 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2014-12-21 22:20:31 +0100
commitd80ff84c28c0be626c1df4528741cddf5a55f547 (patch)
treedfa1a3771f52970bfaf7b9e56d8675aeabfc65ef /common
parentd21aa4e35fb96fa3b07888f710cbc3440af8bdd3 (diff)
downloadsink-d80ff84c28c0be626c1df4528741cddf5a55f547.tar.gz
sink-d80ff84c28c0be626c1df4528741cddf5a55f547.zip
Write-Read loop from clientside.
It's a huge hack but starts to show results. Most urgently we need: * reliable command results * the 3 buffers instead of the 1 * A way to implement storage as preprocessor (or a place to impelement it after the preprocessors).
Diffstat (limited to 'common')
-rw-r--r--common/clientapi.h19
-rw-r--r--common/pipeline.cpp7
-rw-r--r--common/resourceaccess.cpp20
-rw-r--r--common/resourceaccess.h1
-rw-r--r--common/storage.h5
-rw-r--r--common/storage_common.cpp19
-rw-r--r--common/storage_lmdb.cpp16
-rw-r--r--common/test/clientapitest.cpp3
8 files changed, 76 insertions, 14 deletions
diff --git a/common/clientapi.h b/common/clientapi.h
index 6054130..d2757e7 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -278,7 +278,7 @@ public:
278 virtual void create(const DomainType &domainObject) = 0; 278 virtual void create(const DomainType &domainObject) = 0;
279 virtual void modify(const DomainType &domainObject) = 0; 279 virtual void modify(const DomainType &domainObject) = 0;
280 virtual void remove(const DomainType &domainObject) = 0; 280 virtual void remove(const DomainType &domainObject) = 0;
281 virtual void load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; 281 virtual void load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) = 0;
282}; 282};
283 283
284 284
@@ -353,7 +353,7 @@ class Store {
353public: 353public:
354 static QString storageLocation() 354 static QString storageLocation()
355 { 355 {
356 return QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2"; 356 return QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage";
357 } 357 }
358 358
359 /** 359 /**
@@ -371,13 +371,24 @@ public:
371 // Query all resources and aggregate results 371 // Query all resources and aggregate results
372 // query tells us in which resources we're interested 372 // query tells us in which resources we're interested
373 // TODO: queries to individual resources could be parallelized 373 // TODO: queries to individual resources could be parallelized
374 auto eventloop = QSharedPointer<QEventLoop>::create();
375 int completeCounter = 0;
374 for(const QString &resource : query.resources) { 376 for(const QString &resource : query.resources) {
375 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); 377 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource);
376 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. 378 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive.
377 std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); 379 std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1);
378 facade->load(query, addCallback); 380 //We copy the facade pointer to keep it alive
381 facade->load(query, addCallback, [&completeCounter, &query, resultSet, facade, eventloop]() {
382 //TODO use jobs instead of this counter
383 completeCounter++;
384 if (completeCounter == query.resources.size()) {
385 resultSet->complete();
386 eventloop->quit();
387 }
388 });
379 } 389 }
380 resultSet->complete(); 390 //The thread contains no eventloop, so execute one here
391 eventloop->exec(QEventLoop::ExcludeUserInputEvents);
381 }); 392 });
382 return resultSet->emitter(); 393 return resultSet->emitter();
383 } 394 }
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index cf508c5..739909d 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -31,7 +31,7 @@ class Pipeline::Private
31{ 31{
32public: 32public:
33 Private(const QString &resourceName) 33 Private(const QString &resourceName)
34 : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName), 34 : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName, Storage::ReadWrite),
35 stepScheduled(false) 35 stepScheduled(false)
36 { 36 {
37 } 37 }
@@ -71,6 +71,11 @@ void Pipeline::null()
71 71
72void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity) 72void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity)
73{ 73{
74 const qint64 newRevision = storage().maxRevision() + 1;
75 //FIXME this should go into a preprocessor
76 storage().write(key, key.size(), reinterpret_cast<char*>(entity.GetBufferPointer()), entity.GetSize());
77 storage().setMaxRevision(newRevision);
78
74 PipelineState state(this, NewPipeline, key, d->newPipeline); 79 PipelineState state(this, NewPipeline, key, d->newPipeline);
75 d->activePipelines << state; 80 d->activePipelines << state;
76 state.step(); 81 state.step();
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index a7e14f5..1706ac4 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -58,7 +58,7 @@ public:
58 58
59 void write(QIODevice *device, uint messageId) 59 void write(QIODevice *device, uint messageId)
60 { 60 {
61 Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); 61 // Console::main()->log(QString("\tSending queued command %1").arg(m_commandId));
62 Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); 62 Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize);
63 } 63 }
64 64
@@ -82,6 +82,7 @@ public:
82 QByteArray partialMessageBuffer; 82 QByteArray partialMessageBuffer;
83 flatbuffers::FlatBufferBuilder fbb; 83 flatbuffers::FlatBufferBuilder fbb;
84 QVector<QueuedCommand *> commandQueue; 84 QVector<QueuedCommand *> commandQueue;
85 QVector<std::function<void()> > synchronizeResultHandler;
85 uint messageId; 86 uint messageId;
86}; 87};
87 88
@@ -149,6 +150,13 @@ void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &
149 } 150 }
150} 151}
151 152
153void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler)
154{
155 sendCommand(Commands::SynchronizeCommand);
156 //TODO: this should be implemented as a job, so we don't need to store the result handler as member
157 d->synchronizeResultHandler << resultHandler;
158}
159
152void ResourceAccess::open() 160void ResourceAccess::open()
153{ 161{
154 if (d->socket->isValid()) { 162 if (d->socket->isValid()) {
@@ -262,6 +270,13 @@ bool ResourceAccess::processMessageBuffer()
262 auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); 270 auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
263 log(QString("Revision updated to: %1").arg(buffer->revision())); 271 log(QString("Revision updated to: %1").arg(buffer->revision()));
264 emit revisionChanged(buffer->revision()); 272 emit revisionChanged(buffer->revision());
273
274 //FIXME: The result handler should be called on completion of the synchronize command, and not upon arbitrary revision updates.
275 for(auto handler : d->synchronizeResultHandler) {
276 //FIXME: we should associate the handler with a buffer->id() to avoid prematurely triggering the result handler from a delayed synchronized response (this is relevant for on-demand syncing).
277 handler();
278 }
279 d->synchronizeResultHandler.clear();
265 break; 280 break;
266 } 281 }
267 case Commands::CommandCompletion: { 282 case Commands::CommandCompletion: {
@@ -280,7 +295,8 @@ bool ResourceAccess::processMessageBuffer()
280 295
281void ResourceAccess::log(const QString &message) 296void ResourceAccess::log(const QString &message)
282{ 297{
283 Console::main()->log(d->resourceName + ": " + message); 298 qDebug() << d->resourceName + ": " + message;
299 // Console::main()->log(d->resourceName + ": " + message);
284} 300}
285 301
286} 302}
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 3a35af6..7416b25 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -42,6 +42,7 @@ public:
42 42
43 void sendCommand(int commandId); 43 void sendCommand(int commandId);
44 void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); 44 void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb);
45 void synchronizeResource(const std::function<void()> &resultHandler);
45 46
46public Q_SLOTS: 47public Q_SLOTS:
47 void open(); 48 void open();
diff --git a/common/storage.h b/common/storage.h
index 57ee56c..6cfa3d6 100644
--- a/common/storage.h
+++ b/common/storage.h
@@ -71,6 +71,11 @@ public:
71 static std::function<void(const Storage::Error &error)> basicErrorHandler(); 71 static std::function<void(const Storage::Error &error)> basicErrorHandler();
72 qint64 diskUsage() const; 72 qint64 diskUsage() const;
73 void removeFromDisk() const; 73 void removeFromDisk() const;
74
75 qint64 maxRevision();
76 void setMaxRevision(qint64 revision);
77
78 bool exists() const;
74private: 79private:
75 class Private; 80 class Private;
76 Private * const d; 81 Private * const d;
diff --git a/common/storage_common.cpp b/common/storage_common.cpp
index 8f465fc..bdae9dd 100644
--- a/common/storage_common.cpp
+++ b/common/storage_common.cpp
@@ -52,4 +52,23 @@ void Storage::scan(const std::string &sKey, const std::function<bool(void *keyPt
52 scan(sKey.data(), sKey.size(), resultHandler, &errorHandler); 52 scan(sKey.data(), sKey.size(), resultHandler, &errorHandler);
53} 53}
54 54
55void Storage::setMaxRevision(qint64 revision)
56{
57 write("__internal_maxRevision", QString::number(revision).toStdString());
58}
59
60qint64 Storage::maxRevision()
61{
62 qint64 r = 0;
63 read(std::string("__internal_maxRevision"), [&](const std::string &revision) -> bool {
64 r = QString::fromStdString(revision).toLongLong();
65 return false;
66 },
67 [](const Storage::Error &error) {
68 //Ignore the error in case we don't find the value
69 //TODO only ignore value not found errors
70 });
71 return r;
72}
73
55} // namespace Akonadi2 74} // namespace Akonadi2
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp
index a8ec378..eeb0045 100644
--- a/common/storage_lmdb.cpp
+++ b/common/storage_lmdb.cpp
@@ -59,6 +59,7 @@ QMutex Storage::Private::sMutex;
59Storage::Private::Private(const QString &s, const QString &n, AccessMode m) 59Storage::Private::Private(const QString &s, const QString &n, AccessMode m)
60 : storageRoot(s), 60 : storageRoot(s),
61 name(n), 61 name(n),
62 env(0),
62 transaction(0), 63 transaction(0),
63 mode(m), 64 mode(m),
64 readTransaction(false), 65 readTransaction(false),
@@ -66,7 +67,7 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m)
66{ 67{
67 const QString fullPath(storageRoot + '/' + name); 68 const QString fullPath(storageRoot + '/' + name);
68 QDir dir; 69 QDir dir;
69 dir.mkdir(storageRoot); 70 dir.mkpath(storageRoot);
70 dir.mkdir(fullPath); 71 dir.mkdir(fullPath);
71 72
72 //This seems to resolve threading related issues, not sure why though 73 //This seems to resolve threading related issues, not sure why though
@@ -97,8 +98,10 @@ Storage::Private::~Private()
97 } 98 }
98 99
99 // it is still there and still unused, so we can shut it down 100 // it is still there and still unused, so we can shut it down
100 mdb_dbi_close(env, dbi); 101 if (env) {
101 mdb_env_close(env); 102 mdb_dbi_close(env, dbi);
103 mdb_env_close(env);
104 }
102} 105}
103 106
104Storage::Storage(const QString &storageRoot, const QString &name, AccessMode mode) 107Storage::Storage(const QString &storageRoot, const QString &name, AccessMode mode)
@@ -111,6 +114,10 @@ Storage::~Storage()
111 delete d; 114 delete d;
112} 115}
113 116
117bool Storage::exists() const
118{
119 return (d->env != 0);
120}
114bool Storage::isInTransaction() const 121bool Storage::isInTransaction() const
115{ 122{
116 return d->transaction; 123 return d->transaction;
@@ -313,12 +320,9 @@ void Storage::scan(const char *keyData, uint keySize,
313 errorHandler(error); 320 errorHandler(error);
314 } 321 }
315 322
316 /**
317 we don't abort the transaction since we need it for reading the values
318 if (implicitTransaction) { 323 if (implicitTransaction) {
319 abortTransaction(); 324 abortTransaction();
320 } 325 }
321 */
322} 326}
323 327
324qint64 Storage::diskUsage() const 328qint64 Storage::diskUsage() const
diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp
index 2d1c238..dd634f1 100644
--- a/common/test/clientapitest.cpp
+++ b/common/test/clientapitest.cpp
@@ -11,12 +11,13 @@ public:
11 virtual void create(const Akonadi2::Domain::Event &domainObject){}; 11 virtual void create(const Akonadi2::Domain::Event &domainObject){};
12 virtual void modify(const Akonadi2::Domain::Event &domainObject){}; 12 virtual void modify(const Akonadi2::Domain::Event &domainObject){};
13 virtual void remove(const Akonadi2::Domain::Event &domainObject){}; 13 virtual void remove(const Akonadi2::Domain::Event &domainObject){};
14 virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback) 14 virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, const std::function<void()> &completeCallback)
15 { 15 {
16 qDebug() << "load called"; 16 qDebug() << "load called";
17 for(const auto &result : results) { 17 for(const auto &result : results) {
18 resultCallback(result); 18 resultCallback(result);
19 } 19 }
20 completeCallback();
20 } 21 }
21 22
22 QList<Akonadi2::Domain::Event::Ptr> results; 23 QList<Akonadi2::Domain::Event::Ptr> results;