From 6d5be4fb7b8cbc450e2780905eaac9a18b486c5c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 14 Sep 2016 16:16:42 +0200 Subject: New synchronization algorithm that only fetches the last 14 days. --- examples/imapresource/imapresource.cpp | 129 ++++++++++++++++-------------- examples/imapresource/imapserverproxy.cpp | 94 ++++++++++++++-------- examples/imapresource/imapserverproxy.h | 14 ++-- 3 files changed, 139 insertions(+), 98 deletions(-) diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp index 63ae07b..aa0fb94 100644 --- a/examples/imapresource/imapresource.cpp +++ b/examples/imapresource/imapresource.cpp @@ -40,10 +40,14 @@ #include "entitystore.h" #include "remoteidmap.h" #include "query.h" + +#include #include #include #include #include +#include +#include #include "imapserverproxy.h" #include "entityreader.h" @@ -149,7 +153,7 @@ public: } } - void synchronizeMails(const QString &path, const QVector &messages) + void synchronizeMails(const QString &path, const Message &message) { auto time = QSharedPointer::create(); time->start(); @@ -159,23 +163,20 @@ public: const auto folderLocalId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, path.toUtf8()); - int count = 0; - for (const auto &message : messages) { - count++; - const auto remoteId = assembleMailRid(folderLocalId, message.uid); + const auto remoteId = assembleMailRid(folderLocalId, message.uid); - SinkTrace() << "Found a mail " << remoteId << message.msg->subject(true)->asUnicodeString() << message.flags; + Q_ASSERT(message.msg); + SinkTrace() << "Found a mail " << remoteId << message.msg->subject(true)->asUnicodeString() << message.flags; - auto mail = Sink::ApplicationDomain::Mail::create(mResourceInstanceIdentifier); - mail.setFolder(folderLocalId); - mail.setMimeMessage(message.msg->encodedContent()); - mail.setUnread(!message.flags.contains(Imap::Flags::Seen)); - mail.setImportant(message.flags.contains(Imap::Flags::Flagged)); + auto mail = Sink::ApplicationDomain::Mail::create(mResourceInstanceIdentifier); + mail.setFolder(folderLocalId); + mail.setMimeMessage(message.msg->encodedContent()); + mail.setUnread(!message.flags.contains(Imap::Flags::Seen)); + mail.setImportant(message.flags.contains(Imap::Flags::Flagged)); - createOrModify(bufferType, remoteId, mail); - } - const auto elapsed = time->elapsed(); - SinkTrace() << "Synchronized " << count << " mails in " << path << Sink::Log::TraceTime(elapsed) << " " << elapsed/qMax(count, 1) << " [ms/mail]"; + createOrModify(bufferType, remoteId, mail); + // const auto elapsed = time->elapsed(); + // SinkTrace() << "Synchronized " << count << " mails in " << path << Sink::Log::TraceTime(elapsed) << " " << elapsed/qMax(count, 1) << " [ms/mail]"; } void synchronizeRemovals(const QString &path, const QSet &messages) @@ -220,29 +221,71 @@ public: auto capabilities = imap->getCapabilities(); bool canDoIncrementalRemovals = false; return KAsync::start([=]() { + //First we fetch flag changes for all messages. Since we don't know which messages are locally available we just get everything and only apply to what we have. + SinkLog() << "About to update flags" << folder.normalizedPath(); auto uidNext = syncStore().readValue(folder.normalizedPath().toUtf8() + "uidnext").toLongLong(); const auto changedsince = syncStore().readValue(folder.normalizedPath().toUtf8() + "changedsince").toLongLong(); - return imap->fetchFlags(folder, KIMAP2::ImapSet(1, qMax(uidNext, qint64(1))), changedsince, [this, folder](const QVector &messages) { - // synchronizeMails(folder.normalizedPath(), messages); + return imap->fetchFlags(folder, KIMAP2::ImapSet(1, qMax(uidNext, qint64(1))), changedsince, [this, folder](const Message &message) { const auto folderLocalId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, folder.normalizedPath().toUtf8()); - for (const auto &message : messages) { - const auto remoteId = assembleMailRid(folderLocalId, message.uid); + const auto remoteId = assembleMailRid(folderLocalId, message.uid); + SinkLog() << "Updating mail flags " << remoteId << message.flags; - auto mail = Sink::ApplicationDomain::Mail::create(mResourceInstanceIdentifier); - mail.setUnread(!message.flags.contains(Imap::Flags::Seen)); - mail.setImportant(message.flags.contains(Imap::Flags::Flagged)); + auto mail = Sink::ApplicationDomain::Mail::create(mResourceInstanceIdentifier); + mail.setUnread(!message.flags.contains(Imap::Flags::Seen)); + mail.setImportant(message.flags.contains(Imap::Flags::Flagged)); - modify(ENTITY_TYPE_MAIL, remoteId, mail); - } + modify(ENTITY_TYPE_MAIL, remoteId, mail); }) .syncThen([this, folder](const SelectResult &selectResult) { syncStore().writeValue(folder.normalizedPath().toUtf8() + "changedsince", QByteArray::number(selectResult.highestModSequence)); }); }) + .then([=]() { + //Get the range we're interested in. This is what we're going to download. + return imap->fetchUidsSince(imap->mailboxFromFolder(folder), QDate::currentDate().addDays(-14)) + .then>([this, folder, imap](const QVector &uidsToFetch) { + SinkTrace() << "Received result set " << uidsToFetch; + SinkTrace() << "About to fetch mail" << folder.normalizedPath(); + const auto uidNext = syncStore().readValue(folder.normalizedPath().toUtf8() + "uidnext").toLongLong(); + QVector filteredAndSorted = uidsToFetch; + qSort(filteredAndSorted.begin(), filteredAndSorted.end(), qGreater()); + auto lowerBound = qLowerBound(filteredAndSorted.begin(), filteredAndSorted.end(), uidNext, qGreater()); + if (lowerBound != filteredAndSorted.end()) { + filteredAndSorted.erase(lowerBound, filteredAndSorted.end()); + } + + auto maxUid = QSharedPointer::create(0); + if (!filteredAndSorted.isEmpty()) { + *maxUid = filteredAndSorted.first(); + } + SinkTrace() << "Uids to fetch: " << filteredAndSorted; + return imap->fetchMessages(folder, filteredAndSorted, [this, folder, maxUid](const Message &m) { + if (*maxUid < m.uid) { + *maxUid = m.uid; + } + synchronizeMails(folder.normalizedPath(), m); + }, + [this, maxUid, folder](int progress, int total) { + SinkLog() << "Progress: " << progress << " out of " << total; + //commit every 10 messages + if ((progress % 10) == 0) { + commit(); + } + }) + .syncThen([this, maxUid, folder]() { + SinkLog() << "UIDMAX: " << *maxUid << folder.normalizedPath(); + if (*maxUid > 0) { + syncStore().writeValue(folder.normalizedPath().toUtf8() + "uidnext", QByteArray::number(*maxUid)); + } + commit(); + }); + }); + }) .then([=]() { //TODO Remove what's no longer existing if (canDoIncrementalRemovals) { + //TODO do an examine with QRESYNC and remove VANISHED messages } else { return imap->fetchUids(folder).syncThen>([this, folder](const QVector &uids) { SinkTrace() << "Syncing removals"; @@ -251,32 +294,6 @@ public: }); } return KAsync::null(); - }) - .then([this, folder, imap]() { - SinkTrace() << "About to fetch mail"; - const auto uidNext = syncStore().readValue(folder.normalizedPath().toUtf8() + "uidnext").toLongLong(); - auto maxUid = QSharedPointer::create(0); - return imap->fetchMessages(folder, uidNext, [this, folder, maxUid](const QVector &messages) { - SinkTrace() << "Got mail"; - for (const auto &m : messages) { - if (*maxUid < m.uid) { - *maxUid = m.uid; - } - } - synchronizeMails(folder.normalizedPath(), messages); - }, - [this, maxUid, folder](int progress, int total) { - SinkLog() << "Progress: " << progress << " out of " << total; - //commit every 10 messages - if ((progress % 10) == 0) { - commit(); - } - }) - .syncThen([this, maxUid, folder]() { - syncStore().writeValue(folder.normalizedPath().toUtf8() + "uidnext", QByteArray::number(*maxUid)); - SinkLog() << "UIDMAX: " << *maxUid << folder.normalizedPath(); - commit(); - }); }); @@ -578,10 +595,8 @@ KAsync::Job ImapResource::inspect(int inspectionType, const QByteArray &in auto inspectionJob = imap->login(mUser, mPassword) .then(imap->select(folderRemoteId)) .syncThen([](Imap::SelectResult){}) - .then(imap->fetch(set, scope, [imap, messageByUid](const QVector &messages) { - for (const auto &m : messages) { - messageByUid->insert(m.uid, m); - } + .then(imap->fetch(set, scope, [imap, messageByUid](const Imap::Message &message) { + messageByUid->insert(message.uid, message); })); if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) { @@ -641,10 +656,8 @@ KAsync::Job ImapResource::inspect(int inspectionType, const QByteArray &in auto messageByUid = QSharedPointer>::create(); return imap->login(mUser, mPassword) .then(imap->select(remoteId).syncThen([](){})) - .then(imap->fetch(set, scope, [=](const QVector &messages) { - for (const auto &m : messages) { - messageByUid->insert(m.uid, m); - } + .then(imap->fetch(set, scope, [=](const Imap::Message message) { + messageByUid->insert(message.uid, message); })) .then([imap, messageByUid, expectedCount]() { if (messageByUid->size() != expectedCount) { diff --git a/examples/imapresource/imapserverproxy.cpp b/examples/imapresource/imapserverproxy.cpp index a75825e..58164d3 100644 --- a/examples/imapresource/imapserverproxy.cpp +++ b/examples/imapresource/imapserverproxy.cpp @@ -262,22 +262,25 @@ KAsync::Job ImapServerProxy::fetch(const KIMAP2::ImapSet &set, KIMAP2::Fet } KAsync::Job> ImapServerProxy::search(const KIMAP2::ImapSet &set) +{ + return search(KIMAP2::Term(KIMAP2::Term::Uid, set)); +} + +KAsync::Job> ImapServerProxy::search(const KIMAP2::Term &term) { auto search = new KIMAP2::SearchJob(mSession); - search->setTerm(KIMAP2::Term(KIMAP2::Term::Uid, set)); + search->setTerm(term); search->setUidBased(true); return runJob>(search, [](KJob *job) -> QVector { return static_cast(job)->results(); }); } -KAsync::Job ImapServerProxy::fetch(const KIMAP2::ImapSet &set, KIMAP2::FetchJob::FetchScope scope, const std::function &)> &callback) +KAsync::Job ImapServerProxy::fetch(const KIMAP2::ImapSet &set, KIMAP2::FetchJob::FetchScope scope, const std::function &callback) { return fetch(set, scope, [callback](const KIMAP2::FetchJob::Result &result) { - QVector list; - list << Message{result.uid, result.size, result.attributes, result.flags, result.message}; - callback(list); + callback(Message{result.uid, result.size, result.attributes, result.flags, result.message}); }); } @@ -286,9 +289,9 @@ QStringList ImapServerProxy::getCapabilities() const return mCapabilities; } -KAsync::Job> ImapServerProxy::fetchHeaders(const QString &mailbox, const qint64 minUid) +KAsync::Job> ImapServerProxy::fetchHeaders(const QString &mailbox, const qint64 minUid) { - auto list = QSharedPointer>::create(); + auto list = QSharedPointer>::create(); KIMAP2::FetchJob::FetchScope scope; scope.mode = KIMAP2::FetchJob::FetchScope::Flags; @@ -304,14 +307,19 @@ KAsync::Job> ImapServerProxy::fetchHeaders(const QString &mailbox, list->append(result.uid); }) - .syncThen>([list](){ + .syncThen>([list](){ return *list; }); } KAsync::Job> ImapServerProxy::fetchUids(const QString &mailbox) { - return select(mailbox).then>(search(KIMAP2::ImapSet(1, 0))); + return select(mailbox).then>(search(KIMAP2::Term(KIMAP2::Term::Uid, KIMAP2::ImapSet(1, 0)))); +} + +KAsync::Job> ImapServerProxy::fetchUidsSince(const QString &mailbox, const QDate &since) +{ + return select(mailbox).then>(search(KIMAP2::Term(KIMAP2::Term::Since, since))); } KAsync::Job ImapServerProxy::list(KIMAP2::ListJob::Option option, const std::function &flags)> &callback) @@ -390,7 +398,7 @@ QString ImapServerProxy::mailboxFromFolder(const Folder &folder) const return folder.pathParts.join(mPersonalNamespaceSeparator); } -KAsync::Job ImapServerProxy::fetchFlags(const Folder &folder, const KIMAP2::ImapSet &set, qint64 changedsince, std::function &)> callback) +KAsync::Job ImapServerProxy::fetchFlags(const Folder &folder, const KIMAP2::ImapSet &set, qint64 changedsince, std::function callback) { SinkTrace() << "Fetching flags " << folder.normalizedPath(); return select(mailboxFromFolder(folder)).then([=](const SelectResult &selectResult) -> KAsync::Job { @@ -413,7 +421,7 @@ KAsync::Job ImapServerProxy::fetchFlags(const Folder &folder, cons }); } -KAsync::Job ImapServerProxy::fetchMessages(const Folder &folder, qint64 uidNext, std::function &)> callback, std::function progress) +KAsync::Job ImapServerProxy::fetchMessages(const Folder &folder, qint64 uidNext, std::function callback, std::function progress) { auto time = QSharedPointer::create(); time->start(); @@ -426,40 +434,56 @@ KAsync::Job ImapServerProxy::fetchMessages(const Folder &folder, qint64 ui } SinkTrace() << "Fetching messages from " << folder.normalizedPath() << selectResult.uidNext << uidNext; - return fetchHeaders(mailboxFromFolder(folder), (uidNext + 1)).then>([this, callback, time, progress, folder](const QList &uidsToFetch){ + return fetchHeaders(mailboxFromFolder(folder), (uidNext + 1)).then>([this, callback, time, progress, folder](const QVector &uidsToFetch){ SinkTrace() << "Fetched headers" << folder.normalizedPath(); SinkTrace() << " Total: " << uidsToFetch.size(); SinkTrace() << " Uids to fetch: " << uidsToFetch; SinkTrace() << " Took: " << Sink::Log::TraceTime(time->elapsed()); - auto totalCount = uidsToFetch.size(); + return fetchMessages(folder, uidsToFetch, callback, progress); + }); + + }); +} + +KAsync::Job ImapServerProxy::fetchMessages(const Folder &folder, const QVector &uidsToFetch, std::function callback, std::function progress) +{ + auto time = QSharedPointer::create(); + time->start(); + Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); + return select(mailboxFromFolder(folder)).then([this, callback, folder, time, progress, uidsToFetch](const SelectResult &selectResult) -> KAsync::Job { + + SinkTrace() << "Fetching messages" << folder.normalizedPath(); + SinkTrace() << " Total: " << uidsToFetch.size(); + SinkTrace() << " Uids to fetch: " << uidsToFetch; + auto totalCount = uidsToFetch.size(); + if (progress) { + progress(0, totalCount); + } + if (uidsToFetch.isEmpty()) { + SinkTrace() << "Nothing to fetch"; + return KAsync::null(); + } + KIMAP2::FetchJob::FetchScope scope; + scope.parts.clear(); + scope.mode = KIMAP2::FetchJob::FetchScope::Full; + + KIMAP2::ImapSet set; + set.add(uidsToFetch); + auto count = QSharedPointer::create(); + return fetch(set, scope, [=](const Message &message) { + *count += 1; if (progress) { - progress(0, totalCount); - } - if (uidsToFetch.isEmpty()) { - SinkTrace() << "Nothing to fetch"; - callback(QVector()); - return KAsync::null(); + progress(*count, totalCount); } - KIMAP2::FetchJob::FetchScope scope; - scope.parts.clear(); - scope.mode = KIMAP2::FetchJob::FetchScope::Full; - - KIMAP2::ImapSet set; - set.add(uidsToFetch.toVector()); - auto count = QSharedPointer::create(); - return fetch(set, scope, [=](const QVector &messages) { - *count += messages.size(); - if (progress) { - progress(*count, totalCount); - } - callback(messages); - }); + callback(message); }); - + }) + .syncThen([time]() { + SinkTrace() << "The fetch took: " << Sink::Log::TraceTime(time->elapsed()); }); } -KAsync::Job ImapServerProxy::fetchMessages(const Folder &folder, std::function &)> callback, std::function progress) +KAsync::Job ImapServerProxy::fetchMessages(const Folder &folder, std::function callback, std::function progress) { return fetchMessages(folder, 0, callback, progress); } diff --git a/examples/imapresource/imapserverproxy.h b/examples/imapresource/imapserverproxy.h index 452b479..8b39f23 100644 --- a/examples/imapresource/imapserverproxy.h +++ b/examples/imapresource/imapserverproxy.h @@ -25,6 +25,7 @@ #include #include #include +#include namespace Imap { @@ -110,30 +111,33 @@ public: KAsync::Job expunge(const KIMAP2::ImapSet &set); KAsync::Job copy(const KIMAP2::ImapSet &set, const QString &newMailbox); KAsync::Job> search(const KIMAP2::ImapSet &set); + KAsync::Job> search(const KIMAP2::Term &term); typedef std::function FetchCallback; KAsync::Job fetch(const KIMAP2::ImapSet &set, KIMAP2::FetchJob::FetchScope scope, FetchCallback callback); - KAsync::Job fetch(const KIMAP2::ImapSet &set, KIMAP2::FetchJob::FetchScope scope, const std::function &)> &callback); + KAsync::Job fetch(const KIMAP2::ImapSet &set, KIMAP2::FetchJob::FetchScope scope, const std::function &callback); KAsync::Job list(KIMAP2::ListJob::Option option, const std::function &flags)> &callback); QStringList getCapabilities() const; //Composed calls that do login etc. - KAsync::Job> fetchHeaders(const QString &mailbox, qint64 minUid = 1); + KAsync::Job> fetchHeaders(const QString &mailbox, qint64 minUid = 1); KAsync::Job remove(const QString &mailbox, const KIMAP2::ImapSet &set); KAsync::Job remove(const QString &mailbox, const QByteArray &imapSet); KAsync::Job move(const QString &mailbox, const KIMAP2::ImapSet &set, const QString &newMailbox); KAsync::Job createSubfolder(const QString &parentMailbox, const QString &folderName); KAsync::Job renameSubfolder(const QString &mailbox, const QString &newName); KAsync::Job> fetchUids(const QString &mailbox); + KAsync::Job> fetchUidsSince(const QString &mailbox, const QDate &since); QString mailboxFromFolder(const Folder &) const; KAsync::Job fetchFolders(std::function callback); - KAsync::Job fetchMessages(const Folder &folder, std::function &)> callback, std::function progress = std::function()); - KAsync::Job fetchMessages(const Folder &folder, qint64 uidNext, std::function &)> callback, std::function progress = std::function()); - KAsync::Job fetchFlags(const Folder &folder, const KIMAP2::ImapSet &set, qint64 changedsince, std::function &)> callback); + KAsync::Job fetchMessages(const Folder &folder, std::function callback, std::function progress = std::function()); + KAsync::Job fetchMessages(const Folder &folder, qint64 uidNext, std::function callback, std::function progress = std::function()); + KAsync::Job fetchMessages(const Folder &folder, const QVector &uidsToFetch, std::function callback, std::function progress); + KAsync::Job fetchFlags(const Folder &folder, const KIMAP2::ImapSet &set, qint64 changedsince, std::function callback); KAsync::Job> fetchUids(const Folder &folder); private: -- cgit v1.2.3