summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/genericresource.cpp36
-rw-r--r--common/genericresource.h1
-rw-r--r--examples/maildirresource/libmaildir/maildir.cpp7
-rw-r--r--examples/maildirresource/libmaildir/maildir.h5
-rw-r--r--examples/maildirresource/maildirresource.cpp63
-rw-r--r--examples/maildirresource/maildirresource.h6
-rw-r--r--tests/maildirresourcetest.cpp55
7 files changed, 167 insertions, 6 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 5e6764a..afe3900 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -46,6 +46,13 @@ public:
46 return lastReplayedRevision; 46 return lastReplayedRevision;
47 } 47 }
48 48
49 bool allChangesReplayed()
50 {
51 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly));
52 const qint64 lastReplayedRevision = getLastReplayedRevision();
53 return (lastReplayedRevision >= topRevision);
54 }
55
49Q_SIGNALS: 56Q_SIGNALS:
50 void changesReplayed(); 57 void changesReplayed();
51 58
@@ -62,7 +69,8 @@ public Q_SLOTS:
62 }); 69 });
63 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); 70 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
64 71
65 if (lastReplayedRevision < topRevision) { 72 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
73 if (lastReplayedRevision <= topRevision) {
66 qint64 revision = lastReplayedRevision; 74 qint64 revision = lastReplayedRevision;
67 for (;revision <= topRevision; revision++) { 75 for (;revision <= topRevision; revision++) {
68 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 76 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
@@ -82,6 +90,7 @@ public Q_SLOTS:
82 replayStoreTransaction.commit(); 90 replayStoreTransaction.commit();
83 Trace() << "Replayed until " << revision; 91 Trace() << "Replayed until " << revision;
84 } 92 }
93 emit changesReplayed();
85 } 94 }
86 95
87private: 96private:
@@ -269,8 +278,7 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
269 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { 278 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) {
270 return this->replay(type, key, value); 279 return this->replay(type, key, value);
271 }); 280 });
272 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); 281 enableChangeReplay(true);
273 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
274 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 282 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
275 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); 283 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision());
276 284
@@ -285,6 +293,18 @@ GenericResource::~GenericResource()
285 delete mSourceChangeReplay; 293 delete mSourceChangeReplay;
286} 294}
287 295
296void GenericResource::enableChangeReplay(bool enable)
297{
298 if (enable) {
299 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged);
300 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
301 mSourceChangeReplay->revisionChanged();
302 } else {
303 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged);
304 QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
305 }
306}
307
288void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors) 308void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors)
289{ 309{
290 mPipeline->setPreprocessors(type, preprocessors); 310 mPipeline->setPreprocessors(type, preprocessors);
@@ -380,6 +400,16 @@ KAsync::Job<void> GenericResource::processAllMessages()
380 waitForDrained(f, mSynchronizerQueue); 400 waitForDrained(f, mSynchronizerQueue);
381 }).then<void>([this](KAsync::Future<void> &f) { 401 }).then<void>([this](KAsync::Future<void> &f) {
382 waitForDrained(f, mUserQueue); 402 waitForDrained(f, mUserQueue);
403 }).then<void>([this](KAsync::Future<void> &f) {
404 if (mSourceChangeReplay->allChangesReplayed()) {
405 f.setFinished();
406 } else {
407 auto context = new QObject;
408 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() {
409 delete context;
410 f.setFinished();
411 });
412 }
383 }); 413 });
384} 414}
385 415
diff --git a/common/genericresource.h b/common/genericresource.h
index 1aa4206..a58a7c3 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -56,6 +56,7 @@ private Q_SLOTS:
56 void updateLowerBoundRevision(); 56 void updateLowerBoundRevision();
57 57
58protected: 58protected:
59 void enableChangeReplay(bool);
59 void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors); 60 void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors);
60 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value); 61 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value);
61 void onProcessorError(int errorCode, const QString &errorMessage); 62 void onProcessorError(int errorCode, const QString &errorMessage);
diff --git a/examples/maildirresource/libmaildir/maildir.cpp b/examples/maildirresource/libmaildir/maildir.cpp
index 67a2d2d..2b0148c 100644
--- a/examples/maildirresource/libmaildir/maildir.cpp
+++ b/examples/maildirresource/libmaildir/maildir.cpp
@@ -332,6 +332,13 @@ bool Maildir::create()
332 return true; 332 return true;
333} 333}
334 334
335bool Maildir::remove()
336{
337 QDir dir(d->path);
338 dir.removeRecursively();
339 return true;
340}
341
335QString Maildir::path() const 342QString Maildir::path() const
336{ 343{
337 return d->path; 344 return d->path;
diff --git a/examples/maildirresource/libmaildir/maildir.h b/examples/maildirresource/libmaildir/maildir.h
index 6853033..f80ba5d 100644
--- a/examples/maildirresource/libmaildir/maildir.h
+++ b/examples/maildirresource/libmaildir/maildir.h
@@ -71,6 +71,11 @@ public:
71 bool create(); 71 bool create();
72 72
73 /** 73 /**
74 * Remove the maildir and everything it contains.
75 */
76 bool remove();
77
78 /**
74 * Returns the path of this maildir. 79 * Returns the path of this maildir.
75 */ 80 */
76 QString path() const; 81 QString path() const;
diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp
index d0b663b..8333f76 100644
--- a/examples/maildirresource/maildirresource.cpp
+++ b/examples/maildirresource/maildirresource.cpp
@@ -63,6 +63,22 @@ MaildirResource::MaildirResource(const QByteArray &instanceIdentifier, const QSh
63 Trace() << "Started maildir resource for maildir: " << mMaildirPath; 63 Trace() << "Started maildir resource for maildir: " << mMaildirPath;
64} 64}
65 65
66void MaildirResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction)
67{
68 Index index("rid.mapping." + bufferType, transaction);
69 Index localIndex("localid.mapping." + bufferType, transaction);
70 index.add(remoteId, localId);
71 localIndex.add(localId, remoteId);
72}
73
74void MaildirResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction)
75{
76 Index index("rid.mapping." + bufferType, transaction);
77 Index localIndex("localid.mapping." + bufferType, transaction);
78 index.remove(remoteId, localId);
79 localIndex.remove(localId, remoteId);
80}
81
66QString MaildirResource::resolveRemoteId(const QByteArray &bufferType, const QString &remoteId, Akonadi2::Storage::Transaction &transaction) 82QString MaildirResource::resolveRemoteId(const QByteArray &bufferType, const QString &remoteId, Akonadi2::Storage::Transaction &transaction)
67{ 83{
68 //Lookup local id for remote id, or insert a new pair otherwise 84 //Lookup local id for remote id, or insert a new pair otherwise
@@ -332,17 +348,62 @@ KAsync::Job<void> MaildirResource::synchronizeWithSource()
332{ 348{
333 Log() << " Synchronizing"; 349 Log() << " Synchronizing";
334 return KAsync::start<void>([this]() { 350 return KAsync::start<void>([this]() {
351 //Changereplay would deadlock otherwise when trying to open the synchronization store
352 enableChangeReplay(false);
335 auto transaction = Akonadi2::Storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier, Akonadi2::Storage::ReadOnly).createTransaction(Akonadi2::Storage::ReadOnly); 353 auto transaction = Akonadi2::Storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier, Akonadi2::Storage::ReadOnly).createTransaction(Akonadi2::Storage::ReadOnly);
336 synchronizeFolders(transaction); 354 synchronizeFolders(transaction);
337 for (const auto &folder : listAvailableFolders()) { 355 for (const auto &folder : listAvailableFolders()) {
338 synchronizeMails(transaction, folder); 356 synchronizeMails(transaction, folder);
339 } 357 }
358 Log() << "Done Synchronizing";
359 enableChangeReplay(true);
340 }); 360 });
341} 361}
342 362
343KAsync::Job<void> MaildirResource::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) 363KAsync::Job<void> MaildirResource::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
344{ 364{
345 Trace() << "Replaying " << key; 365 //This results in a deadlock during sync
366 Akonadi2::Storage store(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Akonadi2::Storage::ReadWrite);
367 auto synchronizationTransaction = store.createTransaction(Akonadi2::Storage::ReadWrite);
368 const auto uid = Akonadi2::Storage::uidFromKey(key);
369 const auto remoteId = resolveLocalId(type, uid, synchronizationTransaction);
370
371 Trace() << "Replaying " << key << type;
372 if (type == ENTITY_TYPE_FOLDER) {
373 Akonadi2::EntityBuffer buffer(value.data(), value.size());
374 const Akonadi2::Entity &entity = buffer.entity();
375 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
376 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
377 const auto operation = metadataBuffer ? metadataBuffer->operation() : Akonadi2::Operation_Creation;
378 if (operation == Akonadi2::Operation_Creation) {
379 //FIXME: This check only works for new entities
380 //Figure out wether we have replayed that revision already to the source
381 if (!remoteId.isEmpty()) {
382 Trace() << "Change is coming from the source";
383 return KAsync::null<void>();
384 }
385 const Akonadi2::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mFolderAdaptorFactory->createAdaptor(entity));
386 auto folderName = folder.getProperty("name").toString();
387 //TODO handle non toplevel folders
388 auto path = mMaildirPath + "/" + folderName;
389 Trace() << "Creating a new folder: " << path;
390 KPIM::Maildir maildir(path, false);
391 maildir.create();
392 recordRemoteId(ENTITY_TYPE_FOLDER, folder.identifier(), path.toUtf8(), synchronizationTransaction);
393 } else if (operation == Akonadi2::Operation_Removal) {
394 const auto uid = Akonadi2::Storage::uidFromKey(key);
395 const auto remoteId = resolveLocalId(ENTITY_TYPE_FOLDER, uid, synchronizationTransaction);
396 const auto path = remoteId;
397 Trace() << "Removing a folder: " << path;
398 KPIM::Maildir maildir(path, false);
399 maildir.remove();
400 removeRemoteId(ENTITY_TYPE_FOLDER, uid, remoteId.toUtf8(), synchronizationTransaction);
401 } else if (operation == Akonadi2::Operation_Modification) {
402 Warning() << "Folder modifications are not implemented";
403 } else {
404 Warning() << "Unkown operation" << operation;
405 }
406 }
346 return KAsync::null<void>(); 407 return KAsync::null<void>();
347} 408}
348 409
diff --git a/examples/maildirresource/maildirresource.h b/examples/maildirresource/maildirresource.h
index eec1e97..e577e18 100644
--- a/examples/maildirresource/maildirresource.h
+++ b/examples/maildirresource/maildirresource.h
@@ -41,6 +41,12 @@ private:
41 KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; 41 KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
42 42
43 /** 43 /**
44 * Records a localId to remoteId mapping
45 */
46 void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction);
47 void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction);
48
49 /**
44 * Tries to find a local id for the remote id, and creates a new local id otherwise. 50 * Tries to find a local id for the remote id, and creates a new local id otherwise.
45 * 51 *
46 * The new local id is recorded in the local to remote id mapping. 52 * The new local id is recorded in the local to remote id mapping.
diff --git a/tests/maildirresourcetest.cpp b/tests/maildirresourcetest.cpp
index 51ea278..1e2d36b 100644
--- a/tests/maildirresourcetest.cpp
+++ b/tests/maildirresourcetest.cpp
@@ -59,8 +59,6 @@ private Q_SLOTS:
59 copyRecursively(TESTDATAPATH "/maildir1", targetPath); 59 copyRecursively(TESTDATAPATH "/maildir1", targetPath);
60 60
61 Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); 61 Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace);
62 auto factory = Akonadi2::ResourceFactory::load("org.kde.maildir");
63 QVERIFY(factory);
64 MaildirResource::removeFromDisk("org.kde.maildir.instance1"); 62 MaildirResource::removeFromDisk("org.kde.maildir.instance1");
65 Akonadi2::ApplicationDomain::AkonadiResource resource; 63 Akonadi2::ApplicationDomain::AkonadiResource resource;
66 resource.setProperty("identifier", "org.kde.maildir.instance1"); 64 resource.setProperty("identifier", "org.kde.maildir.instance1");
@@ -73,6 +71,7 @@ private Q_SLOTS:
73 { 71 {
74 Akonadi2::Store::shutdown(QByteArray("org.kde.maildir.instance1")).exec().waitForFinished(); 72 Akonadi2::Store::shutdown(QByteArray("org.kde.maildir.instance1")).exec().waitForFinished();
75 MaildirResource::removeFromDisk("org.kde.maildir.instance1"); 73 MaildirResource::removeFromDisk("org.kde.maildir.instance1");
74 Akonadi2::Store::start(QByteArray("org.kde.maildir.instance1")).exec().waitForFinished();
76 } 75 }
77 76
78 void init() 77 void init()
@@ -236,6 +235,58 @@ private Q_SLOTS:
236 QCOMPARE(mailModel->rowCount(QModelIndex()), 1); 235 QCOMPARE(mailModel->rowCount(QModelIndex()), 1);
237 } 236 }
238 237
238 void testCreateFolder()
239 {
240 Akonadi2::Query query;
241 query.resources << "org.kde.maildir.instance1";
242 query.syncOnDemand = false;
243 query.processAll = true;
244
245 //Ensure all local data is processed
246 Akonadi2::Store::synchronize(query).exec().waitForFinished();
247
248 Akonadi2::ApplicationDomain::Folder folder("org.kde.maildir.instance1");
249 folder.setProperty("name", "testCreateFolder");
250
251 Akonadi2::Store::create(folder).exec().waitForFinished();
252
253 //Ensure all local data is processed
254 Akonadi2::Store::synchronize(query).exec().waitForFinished();
255
256 auto targetPath = tempDir.path() + "/maildir1/testCreateFolder";
257 QFileInfo file(targetPath);
258 QTRY_VERIFY(file.exists());
259 QVERIFY(file.isDir());
260 }
261
262 void testRemoveFolder()
263 {
264 Akonadi2::Query query;
265 query.resources << "org.kde.maildir.instance1";
266 query.syncOnDemand = false;
267 query.processAll = true;
268
269 auto targetPath = tempDir.path() + "/maildir1/testCreateFolder";
270
271 Akonadi2::ApplicationDomain::Folder folder("org.kde.maildir.instance1");
272 folder.setProperty("name", "testCreateFolder");
273 Akonadi2::Store::create(folder).exec().waitForFinished();
274 Akonadi2::Store::synchronize(query).exec().waitForFinished();
275 QTRY_VERIFY(QFileInfo(targetPath).exists());
276
277 Akonadi2::Query folderQuery;
278 folderQuery.resources << "org.kde.maildir.instance1";
279 folderQuery.propertyFilter.insert("name", "testCreateFolder");
280 auto model = Akonadi2::Store::loadModel<Akonadi2::ApplicationDomain::Folder>(folderQuery);
281 QTRY_VERIFY(model->data(QModelIndex(), Akonadi2::Store::ChildrenFetchedRole).toBool());
282 QCOMPARE(model->rowCount(QModelIndex()), 1);
283 auto createdFolder = model->index(0, 0, QModelIndex()).data(Akonadi2::Store::DomainObjectRole).value<Akonadi2::ApplicationDomain::Folder::Ptr>();
284
285 Akonadi2::Store::remove(*createdFolder).exec().waitForFinished();
286 Akonadi2::Store::synchronize(query).exec().waitForFinished();
287 QTRY_VERIFY(!QFileInfo(targetPath).exists());
288 }
289
239}; 290};
240 291
241QTEST_MAIN(MaildirResourceTest) 292QTEST_MAIN(MaildirResourceTest)