diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2014-12-21 22:20:31 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2014-12-21 22:20:31 +0100 |
commit | d80ff84c28c0be626c1df4528741cddf5a55f547 (patch) | |
tree | dfa1a3771f52970bfaf7b9e56d8675aeabfc65ef /common | |
parent | d21aa4e35fb96fa3b07888f710cbc3440af8bdd3 (diff) | |
download | sink-d80ff84c28c0be626c1df4528741cddf5a55f547.tar.gz sink-d80ff84c28c0be626c1df4528741cddf5a55f547.zip |
Write-Read loop from clientside.
It's a huge hack but starts to show results.
Most urgently we need:
* reliable command results
* the 3 buffers instead of the 1
* A way to implement storage as preprocessor (or a place to impelement it after the preprocessors).
Diffstat (limited to 'common')
-rw-r--r-- | common/clientapi.h | 19 | ||||
-rw-r--r-- | common/pipeline.cpp | 7 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 20 | ||||
-rw-r--r-- | common/resourceaccess.h | 1 | ||||
-rw-r--r-- | common/storage.h | 5 | ||||
-rw-r--r-- | common/storage_common.cpp | 19 | ||||
-rw-r--r-- | common/storage_lmdb.cpp | 16 | ||||
-rw-r--r-- | common/test/clientapitest.cpp | 3 |
8 files changed, 76 insertions, 14 deletions
diff --git a/common/clientapi.h b/common/clientapi.h index 6054130..d2757e7 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -278,7 +278,7 @@ public: | |||
278 | virtual void create(const DomainType &domainObject) = 0; | 278 | virtual void create(const DomainType &domainObject) = 0; |
279 | virtual void modify(const DomainType &domainObject) = 0; | 279 | virtual void modify(const DomainType &domainObject) = 0; |
280 | virtual void remove(const DomainType &domainObject) = 0; | 280 | virtual void remove(const DomainType &domainObject) = 0; |
281 | virtual void load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; | 281 | virtual void load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) = 0; |
282 | }; | 282 | }; |
283 | 283 | ||
284 | 284 | ||
@@ -353,7 +353,7 @@ class Store { | |||
353 | public: | 353 | public: |
354 | static QString storageLocation() | 354 | static QString storageLocation() |
355 | { | 355 | { |
356 | return QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2"; | 356 | return QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage"; |
357 | } | 357 | } |
358 | 358 | ||
359 | /** | 359 | /** |
@@ -371,13 +371,24 @@ public: | |||
371 | // Query all resources and aggregate results | 371 | // Query all resources and aggregate results |
372 | // query tells us in which resources we're interested | 372 | // query tells us in which resources we're interested |
373 | // TODO: queries to individual resources could be parallelized | 373 | // TODO: queries to individual resources could be parallelized |
374 | auto eventloop = QSharedPointer<QEventLoop>::create(); | ||
375 | int completeCounter = 0; | ||
374 | for(const QString &resource : query.resources) { | 376 | for(const QString &resource : query.resources) { |
375 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); | 377 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); |
376 | //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. | 378 | //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. |
377 | std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); | 379 | std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); |
378 | facade->load(query, addCallback); | 380 | //We copy the facade pointer to keep it alive |
381 | facade->load(query, addCallback, [&completeCounter, &query, resultSet, facade, eventloop]() { | ||
382 | //TODO use jobs instead of this counter | ||
383 | completeCounter++; | ||
384 | if (completeCounter == query.resources.size()) { | ||
385 | resultSet->complete(); | ||
386 | eventloop->quit(); | ||
387 | } | ||
388 | }); | ||
379 | } | 389 | } |
380 | resultSet->complete(); | 390 | //The thread contains no eventloop, so execute one here |
391 | eventloop->exec(QEventLoop::ExcludeUserInputEvents); | ||
381 | }); | 392 | }); |
382 | return resultSet->emitter(); | 393 | return resultSet->emitter(); |
383 | } | 394 | } |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index cf508c5..739909d 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -31,7 +31,7 @@ class Pipeline::Private | |||
31 | { | 31 | { |
32 | public: | 32 | public: |
33 | Private(const QString &resourceName) | 33 | Private(const QString &resourceName) |
34 | : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName), | 34 | : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName, Storage::ReadWrite), |
35 | stepScheduled(false) | 35 | stepScheduled(false) |
36 | { | 36 | { |
37 | } | 37 | } |
@@ -71,6 +71,11 @@ void Pipeline::null() | |||
71 | 71 | ||
72 | void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity) | 72 | void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity) |
73 | { | 73 | { |
74 | const qint64 newRevision = storage().maxRevision() + 1; | ||
75 | //FIXME this should go into a preprocessor | ||
76 | storage().write(key, key.size(), reinterpret_cast<char*>(entity.GetBufferPointer()), entity.GetSize()); | ||
77 | storage().setMaxRevision(newRevision); | ||
78 | |||
74 | PipelineState state(this, NewPipeline, key, d->newPipeline); | 79 | PipelineState state(this, NewPipeline, key, d->newPipeline); |
75 | d->activePipelines << state; | 80 | d->activePipelines << state; |
76 | state.step(); | 81 | state.step(); |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index a7e14f5..1706ac4 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -58,7 +58,7 @@ public: | |||
58 | 58 | ||
59 | void write(QIODevice *device, uint messageId) | 59 | void write(QIODevice *device, uint messageId) |
60 | { | 60 | { |
61 | Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); | 61 | // Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); |
62 | Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); | 62 | Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); |
63 | } | 63 | } |
64 | 64 | ||
@@ -82,6 +82,7 @@ public: | |||
82 | QByteArray partialMessageBuffer; | 82 | QByteArray partialMessageBuffer; |
83 | flatbuffers::FlatBufferBuilder fbb; | 83 | flatbuffers::FlatBufferBuilder fbb; |
84 | QVector<QueuedCommand *> commandQueue; | 84 | QVector<QueuedCommand *> commandQueue; |
85 | QVector<std::function<void()> > synchronizeResultHandler; | ||
85 | uint messageId; | 86 | uint messageId; |
86 | }; | 87 | }; |
87 | 88 | ||
@@ -149,6 +150,13 @@ void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder & | |||
149 | } | 150 | } |
150 | } | 151 | } |
151 | 152 | ||
153 | void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler) | ||
154 | { | ||
155 | sendCommand(Commands::SynchronizeCommand); | ||
156 | //TODO: this should be implemented as a job, so we don't need to store the result handler as member | ||
157 | d->synchronizeResultHandler << resultHandler; | ||
158 | } | ||
159 | |||
152 | void ResourceAccess::open() | 160 | void ResourceAccess::open() |
153 | { | 161 | { |
154 | if (d->socket->isValid()) { | 162 | if (d->socket->isValid()) { |
@@ -262,6 +270,13 @@ bool ResourceAccess::processMessageBuffer() | |||
262 | auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); | 270 | auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); |
263 | log(QString("Revision updated to: %1").arg(buffer->revision())); | 271 | log(QString("Revision updated to: %1").arg(buffer->revision())); |
264 | emit revisionChanged(buffer->revision()); | 272 | emit revisionChanged(buffer->revision()); |
273 | |||
274 | //FIXME: The result handler should be called on completion of the synchronize command, and not upon arbitrary revision updates. | ||
275 | for(auto handler : d->synchronizeResultHandler) { | ||
276 | //FIXME: we should associate the handler with a buffer->id() to avoid prematurely triggering the result handler from a delayed synchronized response (this is relevant for on-demand syncing). | ||
277 | handler(); | ||
278 | } | ||
279 | d->synchronizeResultHandler.clear(); | ||
265 | break; | 280 | break; |
266 | } | 281 | } |
267 | case Commands::CommandCompletion: { | 282 | case Commands::CommandCompletion: { |
@@ -280,7 +295,8 @@ bool ResourceAccess::processMessageBuffer() | |||
280 | 295 | ||
281 | void ResourceAccess::log(const QString &message) | 296 | void ResourceAccess::log(const QString &message) |
282 | { | 297 | { |
283 | Console::main()->log(d->resourceName + ": " + message); | 298 | qDebug() << d->resourceName + ": " + message; |
299 | // Console::main()->log(d->resourceName + ": " + message); | ||
284 | } | 300 | } |
285 | 301 | ||
286 | } | 302 | } |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 3a35af6..7416b25 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -42,6 +42,7 @@ public: | |||
42 | 42 | ||
43 | void sendCommand(int commandId); | 43 | void sendCommand(int commandId); |
44 | void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); | 44 | void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); |
45 | void synchronizeResource(const std::function<void()> &resultHandler); | ||
45 | 46 | ||
46 | public Q_SLOTS: | 47 | public Q_SLOTS: |
47 | void open(); | 48 | void open(); |
diff --git a/common/storage.h b/common/storage.h index 57ee56c..6cfa3d6 100644 --- a/common/storage.h +++ b/common/storage.h | |||
@@ -71,6 +71,11 @@ public: | |||
71 | static std::function<void(const Storage::Error &error)> basicErrorHandler(); | 71 | static std::function<void(const Storage::Error &error)> basicErrorHandler(); |
72 | qint64 diskUsage() const; | 72 | qint64 diskUsage() const; |
73 | void removeFromDisk() const; | 73 | void removeFromDisk() const; |
74 | |||
75 | qint64 maxRevision(); | ||
76 | void setMaxRevision(qint64 revision); | ||
77 | |||
78 | bool exists() const; | ||
74 | private: | 79 | private: |
75 | class Private; | 80 | class Private; |
76 | Private * const d; | 81 | Private * const d; |
diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 8f465fc..bdae9dd 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp | |||
@@ -52,4 +52,23 @@ void Storage::scan(const std::string &sKey, const std::function<bool(void *keyPt | |||
52 | scan(sKey.data(), sKey.size(), resultHandler, &errorHandler); | 52 | scan(sKey.data(), sKey.size(), resultHandler, &errorHandler); |
53 | } | 53 | } |
54 | 54 | ||
55 | void Storage::setMaxRevision(qint64 revision) | ||
56 | { | ||
57 | write("__internal_maxRevision", QString::number(revision).toStdString()); | ||
58 | } | ||
59 | |||
60 | qint64 Storage::maxRevision() | ||
61 | { | ||
62 | qint64 r = 0; | ||
63 | read(std::string("__internal_maxRevision"), [&](const std::string &revision) -> bool { | ||
64 | r = QString::fromStdString(revision).toLongLong(); | ||
65 | return false; | ||
66 | }, | ||
67 | [](const Storage::Error &error) { | ||
68 | //Ignore the error in case we don't find the value | ||
69 | //TODO only ignore value not found errors | ||
70 | }); | ||
71 | return r; | ||
72 | } | ||
73 | |||
55 | } // namespace Akonadi2 | 74 | } // namespace Akonadi2 |
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index a8ec378..eeb0045 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp | |||
@@ -59,6 +59,7 @@ QMutex Storage::Private::sMutex; | |||
59 | Storage::Private::Private(const QString &s, const QString &n, AccessMode m) | 59 | Storage::Private::Private(const QString &s, const QString &n, AccessMode m) |
60 | : storageRoot(s), | 60 | : storageRoot(s), |
61 | name(n), | 61 | name(n), |
62 | env(0), | ||
62 | transaction(0), | 63 | transaction(0), |
63 | mode(m), | 64 | mode(m), |
64 | readTransaction(false), | 65 | readTransaction(false), |
@@ -66,7 +67,7 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m) | |||
66 | { | 67 | { |
67 | const QString fullPath(storageRoot + '/' + name); | 68 | const QString fullPath(storageRoot + '/' + name); |
68 | QDir dir; | 69 | QDir dir; |
69 | dir.mkdir(storageRoot); | 70 | dir.mkpath(storageRoot); |
70 | dir.mkdir(fullPath); | 71 | dir.mkdir(fullPath); |
71 | 72 | ||
72 | //This seems to resolve threading related issues, not sure why though | 73 | //This seems to resolve threading related issues, not sure why though |
@@ -97,8 +98,10 @@ Storage::Private::~Private() | |||
97 | } | 98 | } |
98 | 99 | ||
99 | // it is still there and still unused, so we can shut it down | 100 | // it is still there and still unused, so we can shut it down |
100 | mdb_dbi_close(env, dbi); | 101 | if (env) { |
101 | mdb_env_close(env); | 102 | mdb_dbi_close(env, dbi); |
103 | mdb_env_close(env); | ||
104 | } | ||
102 | } | 105 | } |
103 | 106 | ||
104 | Storage::Storage(const QString &storageRoot, const QString &name, AccessMode mode) | 107 | Storage::Storage(const QString &storageRoot, const QString &name, AccessMode mode) |
@@ -111,6 +114,10 @@ Storage::~Storage() | |||
111 | delete d; | 114 | delete d; |
112 | } | 115 | } |
113 | 116 | ||
117 | bool Storage::exists() const | ||
118 | { | ||
119 | return (d->env != 0); | ||
120 | } | ||
114 | bool Storage::isInTransaction() const | 121 | bool Storage::isInTransaction() const |
115 | { | 122 | { |
116 | return d->transaction; | 123 | return d->transaction; |
@@ -313,12 +320,9 @@ void Storage::scan(const char *keyData, uint keySize, | |||
313 | errorHandler(error); | 320 | errorHandler(error); |
314 | } | 321 | } |
315 | 322 | ||
316 | /** | ||
317 | we don't abort the transaction since we need it for reading the values | ||
318 | if (implicitTransaction) { | 323 | if (implicitTransaction) { |
319 | abortTransaction(); | 324 | abortTransaction(); |
320 | } | 325 | } |
321 | */ | ||
322 | } | 326 | } |
323 | 327 | ||
324 | qint64 Storage::diskUsage() const | 328 | qint64 Storage::diskUsage() const |
diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp index 2d1c238..dd634f1 100644 --- a/common/test/clientapitest.cpp +++ b/common/test/clientapitest.cpp | |||
@@ -11,12 +11,13 @@ public: | |||
11 | virtual void create(const Akonadi2::Domain::Event &domainObject){}; | 11 | virtual void create(const Akonadi2::Domain::Event &domainObject){}; |
12 | virtual void modify(const Akonadi2::Domain::Event &domainObject){}; | 12 | virtual void modify(const Akonadi2::Domain::Event &domainObject){}; |
13 | virtual void remove(const Akonadi2::Domain::Event &domainObject){}; | 13 | virtual void remove(const Akonadi2::Domain::Event &domainObject){}; |
14 | virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback) | 14 | virtual void load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::Domain::Event::Ptr &)> &resultCallback, const std::function<void()> &completeCallback) |
15 | { | 15 | { |
16 | qDebug() << "load called"; | 16 | qDebug() << "load called"; |
17 | for(const auto &result : results) { | 17 | for(const auto &result : results) { |
18 | resultCallback(result); | 18 | resultCallback(result); |
19 | } | 19 | } |
20 | completeCallback(); | ||
20 | } | 21 | } |
21 | 22 | ||
22 | QList<Akonadi2::Domain::Event::Ptr> results; | 23 | QList<Akonadi2::Domain::Event::Ptr> results; |