diff options
-rw-r--r-- | common/genericresource.cpp | 36 | ||||
-rw-r--r-- | common/genericresource.h | 1 | ||||
-rw-r--r-- | examples/maildirresource/libmaildir/maildir.cpp | 7 | ||||
-rw-r--r-- | examples/maildirresource/libmaildir/maildir.h | 5 | ||||
-rw-r--r-- | examples/maildirresource/maildirresource.cpp | 63 | ||||
-rw-r--r-- | examples/maildirresource/maildirresource.h | 6 | ||||
-rw-r--r-- | tests/maildirresourcetest.cpp | 55 |
7 files changed, 167 insertions, 6 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 5e6764a..afe3900 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -46,6 +46,13 @@ public: | |||
46 | return lastReplayedRevision; | 46 | return lastReplayedRevision; |
47 | } | 47 | } |
48 | 48 | ||
49 | bool allChangesReplayed() | ||
50 | { | ||
51 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); | ||
52 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | ||
53 | return (lastReplayedRevision >= topRevision); | ||
54 | } | ||
55 | |||
49 | Q_SIGNALS: | 56 | Q_SIGNALS: |
50 | void changesReplayed(); | 57 | void changesReplayed(); |
51 | 58 | ||
@@ -62,7 +69,8 @@ public Q_SLOTS: | |||
62 | }); | 69 | }); |
63 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | 70 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); |
64 | 71 | ||
65 | if (lastReplayedRevision < topRevision) { | 72 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; |
73 | if (lastReplayedRevision <= topRevision) { | ||
66 | qint64 revision = lastReplayedRevision; | 74 | qint64 revision = lastReplayedRevision; |
67 | for (;revision <= topRevision; revision++) { | 75 | for (;revision <= topRevision; revision++) { |
68 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | 76 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); |
@@ -82,6 +90,7 @@ public Q_SLOTS: | |||
82 | replayStoreTransaction.commit(); | 90 | replayStoreTransaction.commit(); |
83 | Trace() << "Replayed until " << revision; | 91 | Trace() << "Replayed until " << revision; |
84 | } | 92 | } |
93 | emit changesReplayed(); | ||
85 | } | 94 | } |
86 | 95 | ||
87 | private: | 96 | private: |
@@ -269,8 +278,7 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
269 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { | 278 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { |
270 | return this->replay(type, key, value); | 279 | return this->replay(type, key, value); |
271 | }); | 280 | }); |
272 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | 281 | enableChangeReplay(true); |
273 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | ||
274 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | 282 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); |
275 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); | 283 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); |
276 | 284 | ||
@@ -285,6 +293,18 @@ GenericResource::~GenericResource() | |||
285 | delete mSourceChangeReplay; | 293 | delete mSourceChangeReplay; |
286 | } | 294 | } |
287 | 295 | ||
296 | void GenericResource::enableChangeReplay(bool enable) | ||
297 | { | ||
298 | if (enable) { | ||
299 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | ||
300 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | ||
301 | mSourceChangeReplay->revisionChanged(); | ||
302 | } else { | ||
303 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | ||
304 | QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | ||
305 | } | ||
306 | } | ||
307 | |||
288 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors) | 308 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors) |
289 | { | 309 | { |
290 | mPipeline->setPreprocessors(type, preprocessors); | 310 | mPipeline->setPreprocessors(type, preprocessors); |
@@ -380,6 +400,16 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
380 | waitForDrained(f, mSynchronizerQueue); | 400 | waitForDrained(f, mSynchronizerQueue); |
381 | }).then<void>([this](KAsync::Future<void> &f) { | 401 | }).then<void>([this](KAsync::Future<void> &f) { |
382 | waitForDrained(f, mUserQueue); | 402 | waitForDrained(f, mUserQueue); |
403 | }).then<void>([this](KAsync::Future<void> &f) { | ||
404 | if (mSourceChangeReplay->allChangesReplayed()) { | ||
405 | f.setFinished(); | ||
406 | } else { | ||
407 | auto context = new QObject; | ||
408 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { | ||
409 | delete context; | ||
410 | f.setFinished(); | ||
411 | }); | ||
412 | } | ||
383 | }); | 413 | }); |
384 | } | 414 | } |
385 | 415 | ||
diff --git a/common/genericresource.h b/common/genericresource.h index 1aa4206..a58a7c3 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -56,6 +56,7 @@ private Q_SLOTS: | |||
56 | void updateLowerBoundRevision(); | 56 | void updateLowerBoundRevision(); |
57 | 57 | ||
58 | protected: | 58 | protected: |
59 | void enableChangeReplay(bool); | ||
59 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors); | 60 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors); |
60 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value); | 61 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value); |
61 | void onProcessorError(int errorCode, const QString &errorMessage); | 62 | void onProcessorError(int errorCode, const QString &errorMessage); |
diff --git a/examples/maildirresource/libmaildir/maildir.cpp b/examples/maildirresource/libmaildir/maildir.cpp index 67a2d2d..2b0148c 100644 --- a/examples/maildirresource/libmaildir/maildir.cpp +++ b/examples/maildirresource/libmaildir/maildir.cpp | |||
@@ -332,6 +332,13 @@ bool Maildir::create() | |||
332 | return true; | 332 | return true; |
333 | } | 333 | } |
334 | 334 | ||
335 | bool Maildir::remove() | ||
336 | { | ||
337 | QDir dir(d->path); | ||
338 | dir.removeRecursively(); | ||
339 | return true; | ||
340 | } | ||
341 | |||
335 | QString Maildir::path() const | 342 | QString Maildir::path() const |
336 | { | 343 | { |
337 | return d->path; | 344 | return d->path; |
diff --git a/examples/maildirresource/libmaildir/maildir.h b/examples/maildirresource/libmaildir/maildir.h index 6853033..f80ba5d 100644 --- a/examples/maildirresource/libmaildir/maildir.h +++ b/examples/maildirresource/libmaildir/maildir.h | |||
@@ -71,6 +71,11 @@ public: | |||
71 | bool create(); | 71 | bool create(); |
72 | 72 | ||
73 | /** | 73 | /** |
74 | * Remove the maildir and everything it contains. | ||
75 | */ | ||
76 | bool remove(); | ||
77 | |||
78 | /** | ||
74 | * Returns the path of this maildir. | 79 | * Returns the path of this maildir. |
75 | */ | 80 | */ |
76 | QString path() const; | 81 | QString path() const; |
diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp index d0b663b..8333f76 100644 --- a/examples/maildirresource/maildirresource.cpp +++ b/examples/maildirresource/maildirresource.cpp | |||
@@ -63,6 +63,22 @@ MaildirResource::MaildirResource(const QByteArray &instanceIdentifier, const QSh | |||
63 | Trace() << "Started maildir resource for maildir: " << mMaildirPath; | 63 | Trace() << "Started maildir resource for maildir: " << mMaildirPath; |
64 | } | 64 | } |
65 | 65 | ||
66 | void MaildirResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | ||
67 | { | ||
68 | Index index("rid.mapping." + bufferType, transaction); | ||
69 | Index localIndex("localid.mapping." + bufferType, transaction); | ||
70 | index.add(remoteId, localId); | ||
71 | localIndex.add(localId, remoteId); | ||
72 | } | ||
73 | |||
74 | void MaildirResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | ||
75 | { | ||
76 | Index index("rid.mapping." + bufferType, transaction); | ||
77 | Index localIndex("localid.mapping." + bufferType, transaction); | ||
78 | index.remove(remoteId, localId); | ||
79 | localIndex.remove(localId, remoteId); | ||
80 | } | ||
81 | |||
66 | QString MaildirResource::resolveRemoteId(const QByteArray &bufferType, const QString &remoteId, Akonadi2::Storage::Transaction &transaction) | 82 | QString MaildirResource::resolveRemoteId(const QByteArray &bufferType, const QString &remoteId, Akonadi2::Storage::Transaction &transaction) |
67 | { | 83 | { |
68 | //Lookup local id for remote id, or insert a new pair otherwise | 84 | //Lookup local id for remote id, or insert a new pair otherwise |
@@ -332,17 +348,62 @@ KAsync::Job<void> MaildirResource::synchronizeWithSource() | |||
332 | { | 348 | { |
333 | Log() << " Synchronizing"; | 349 | Log() << " Synchronizing"; |
334 | return KAsync::start<void>([this]() { | 350 | return KAsync::start<void>([this]() { |
351 | //Changereplay would deadlock otherwise when trying to open the synchronization store | ||
352 | enableChangeReplay(false); | ||
335 | auto transaction = Akonadi2::Storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier, Akonadi2::Storage::ReadOnly).createTransaction(Akonadi2::Storage::ReadOnly); | 353 | auto transaction = Akonadi2::Storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier, Akonadi2::Storage::ReadOnly).createTransaction(Akonadi2::Storage::ReadOnly); |
336 | synchronizeFolders(transaction); | 354 | synchronizeFolders(transaction); |
337 | for (const auto &folder : listAvailableFolders()) { | 355 | for (const auto &folder : listAvailableFolders()) { |
338 | synchronizeMails(transaction, folder); | 356 | synchronizeMails(transaction, folder); |
339 | } | 357 | } |
358 | Log() << "Done Synchronizing"; | ||
359 | enableChangeReplay(true); | ||
340 | }); | 360 | }); |
341 | } | 361 | } |
342 | 362 | ||
343 | KAsync::Job<void> MaildirResource::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | 363 | KAsync::Job<void> MaildirResource::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) |
344 | { | 364 | { |
345 | Trace() << "Replaying " << key; | 365 | //This results in a deadlock during sync |
366 | Akonadi2::Storage store(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Akonadi2::Storage::ReadWrite); | ||
367 | auto synchronizationTransaction = store.createTransaction(Akonadi2::Storage::ReadWrite); | ||
368 | const auto uid = Akonadi2::Storage::uidFromKey(key); | ||
369 | const auto remoteId = resolveLocalId(type, uid, synchronizationTransaction); | ||
370 | |||
371 | Trace() << "Replaying " << key << type; | ||
372 | if (type == ENTITY_TYPE_FOLDER) { | ||
373 | Akonadi2::EntityBuffer buffer(value.data(), value.size()); | ||
374 | const Akonadi2::Entity &entity = buffer.entity(); | ||
375 | const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata()); | ||
376 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
377 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Akonadi2::Operation_Creation; | ||
378 | if (operation == Akonadi2::Operation_Creation) { | ||
379 | //FIXME: This check only works for new entities | ||
380 | //Figure out wether we have replayed that revision already to the source | ||
381 | if (!remoteId.isEmpty()) { | ||
382 | Trace() << "Change is coming from the source"; | ||
383 | return KAsync::null<void>(); | ||
384 | } | ||
385 | const Akonadi2::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mFolderAdaptorFactory->createAdaptor(entity)); | ||
386 | auto folderName = folder.getProperty("name").toString(); | ||
387 | //TODO handle non toplevel folders | ||
388 | auto path = mMaildirPath + "/" + folderName; | ||
389 | Trace() << "Creating a new folder: " << path; | ||
390 | KPIM::Maildir maildir(path, false); | ||
391 | maildir.create(); | ||
392 | recordRemoteId(ENTITY_TYPE_FOLDER, folder.identifier(), path.toUtf8(), synchronizationTransaction); | ||
393 | } else if (operation == Akonadi2::Operation_Removal) { | ||
394 | const auto uid = Akonadi2::Storage::uidFromKey(key); | ||
395 | const auto remoteId = resolveLocalId(ENTITY_TYPE_FOLDER, uid, synchronizationTransaction); | ||
396 | const auto path = remoteId; | ||
397 | Trace() << "Removing a folder: " << path; | ||
398 | KPIM::Maildir maildir(path, false); | ||
399 | maildir.remove(); | ||
400 | removeRemoteId(ENTITY_TYPE_FOLDER, uid, remoteId.toUtf8(), synchronizationTransaction); | ||
401 | } else if (operation == Akonadi2::Operation_Modification) { | ||
402 | Warning() << "Folder modifications are not implemented"; | ||
403 | } else { | ||
404 | Warning() << "Unkown operation" << operation; | ||
405 | } | ||
406 | } | ||
346 | return KAsync::null<void>(); | 407 | return KAsync::null<void>(); |
347 | } | 408 | } |
348 | 409 | ||
diff --git a/examples/maildirresource/maildirresource.h b/examples/maildirresource/maildirresource.h index eec1e97..e577e18 100644 --- a/examples/maildirresource/maildirresource.h +++ b/examples/maildirresource/maildirresource.h | |||
@@ -41,6 +41,12 @@ private: | |||
41 | KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | 41 | KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; |
42 | 42 | ||
43 | /** | 43 | /** |
44 | * Records a localId to remoteId mapping | ||
45 | */ | ||
46 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction); | ||
47 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction); | ||
48 | |||
49 | /** | ||
44 | * Tries to find a local id for the remote id, and creates a new local id otherwise. | 50 | * Tries to find a local id for the remote id, and creates a new local id otherwise. |
45 | * | 51 | * |
46 | * The new local id is recorded in the local to remote id mapping. | 52 | * The new local id is recorded in the local to remote id mapping. |
diff --git a/tests/maildirresourcetest.cpp b/tests/maildirresourcetest.cpp index 51ea278..1e2d36b 100644 --- a/tests/maildirresourcetest.cpp +++ b/tests/maildirresourcetest.cpp | |||
@@ -59,8 +59,6 @@ private Q_SLOTS: | |||
59 | copyRecursively(TESTDATAPATH "/maildir1", targetPath); | 59 | copyRecursively(TESTDATAPATH "/maildir1", targetPath); |
60 | 60 | ||
61 | Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); | 61 | Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); |
62 | auto factory = Akonadi2::ResourceFactory::load("org.kde.maildir"); | ||
63 | QVERIFY(factory); | ||
64 | MaildirResource::removeFromDisk("org.kde.maildir.instance1"); | 62 | MaildirResource::removeFromDisk("org.kde.maildir.instance1"); |
65 | Akonadi2::ApplicationDomain::AkonadiResource resource; | 63 | Akonadi2::ApplicationDomain::AkonadiResource resource; |
66 | resource.setProperty("identifier", "org.kde.maildir.instance1"); | 64 | resource.setProperty("identifier", "org.kde.maildir.instance1"); |
@@ -73,6 +71,7 @@ private Q_SLOTS: | |||
73 | { | 71 | { |
74 | Akonadi2::Store::shutdown(QByteArray("org.kde.maildir.instance1")).exec().waitForFinished(); | 72 | Akonadi2::Store::shutdown(QByteArray("org.kde.maildir.instance1")).exec().waitForFinished(); |
75 | MaildirResource::removeFromDisk("org.kde.maildir.instance1"); | 73 | MaildirResource::removeFromDisk("org.kde.maildir.instance1"); |
74 | Akonadi2::Store::start(QByteArray("org.kde.maildir.instance1")).exec().waitForFinished(); | ||
76 | } | 75 | } |
77 | 76 | ||
78 | void init() | 77 | void init() |
@@ -236,6 +235,58 @@ private Q_SLOTS: | |||
236 | QCOMPARE(mailModel->rowCount(QModelIndex()), 1); | 235 | QCOMPARE(mailModel->rowCount(QModelIndex()), 1); |
237 | } | 236 | } |
238 | 237 | ||
238 | void testCreateFolder() | ||
239 | { | ||
240 | Akonadi2::Query query; | ||
241 | query.resources << "org.kde.maildir.instance1"; | ||
242 | query.syncOnDemand = false; | ||
243 | query.processAll = true; | ||
244 | |||
245 | //Ensure all local data is processed | ||
246 | Akonadi2::Store::synchronize(query).exec().waitForFinished(); | ||
247 | |||
248 | Akonadi2::ApplicationDomain::Folder folder("org.kde.maildir.instance1"); | ||
249 | folder.setProperty("name", "testCreateFolder"); | ||
250 | |||
251 | Akonadi2::Store::create(folder).exec().waitForFinished(); | ||
252 | |||
253 | //Ensure all local data is processed | ||
254 | Akonadi2::Store::synchronize(query).exec().waitForFinished(); | ||
255 | |||
256 | auto targetPath = tempDir.path() + "/maildir1/testCreateFolder"; | ||
257 | QFileInfo file(targetPath); | ||
258 | QTRY_VERIFY(file.exists()); | ||
259 | QVERIFY(file.isDir()); | ||
260 | } | ||
261 | |||
262 | void testRemoveFolder() | ||
263 | { | ||
264 | Akonadi2::Query query; | ||
265 | query.resources << "org.kde.maildir.instance1"; | ||
266 | query.syncOnDemand = false; | ||
267 | query.processAll = true; | ||
268 | |||
269 | auto targetPath = tempDir.path() + "/maildir1/testCreateFolder"; | ||
270 | |||
271 | Akonadi2::ApplicationDomain::Folder folder("org.kde.maildir.instance1"); | ||
272 | folder.setProperty("name", "testCreateFolder"); | ||
273 | Akonadi2::Store::create(folder).exec().waitForFinished(); | ||
274 | Akonadi2::Store::synchronize(query).exec().waitForFinished(); | ||
275 | QTRY_VERIFY(QFileInfo(targetPath).exists()); | ||
276 | |||
277 | Akonadi2::Query folderQuery; | ||
278 | folderQuery.resources << "org.kde.maildir.instance1"; | ||
279 | folderQuery.propertyFilter.insert("name", "testCreateFolder"); | ||
280 | auto model = Akonadi2::Store::loadModel<Akonadi2::ApplicationDomain::Folder>(folderQuery); | ||
281 | QTRY_VERIFY(model->data(QModelIndex(), Akonadi2::Store::ChildrenFetchedRole).toBool()); | ||
282 | QCOMPARE(model->rowCount(QModelIndex()), 1); | ||
283 | auto createdFolder = model->index(0, 0, QModelIndex()).data(Akonadi2::Store::DomainObjectRole).value<Akonadi2::ApplicationDomain::Folder::Ptr>(); | ||
284 | |||
285 | Akonadi2::Store::remove(*createdFolder).exec().waitForFinished(); | ||
286 | Akonadi2::Store::synchronize(query).exec().waitForFinished(); | ||
287 | QTRY_VERIFY(!QFileInfo(targetPath).exists()); | ||
288 | } | ||
289 | |||
239 | }; | 290 | }; |
240 | 291 | ||
241 | QTEST_MAIN(MaildirResourceTest) | 292 | QTEST_MAIN(MaildirResourceTest) |