From 8e84c8a78b7e308cc2b09241af649851036d11de Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 10 Jan 2017 11:28:53 +0100 Subject: Improved imap mail sync algorithm. * when requesting individual mails we sync the full content * when requesting individual folders we get 2 weeks of full content + headers for everything else. * when requesting a sync for all folders we only get 2 weeks of full content. Getting the headers for 50k messages takes about 180s on my system with kolabnow (network being the bottleneck), so that's managable. Getting the full content would take in the range of hours. This way we have something to show, and a way to request more data, without making the system overly complex yet. Certainly not the final solution, but a good start. --- examples/imapresource/imapresource.cpp | 293 ++++++++++++++++++++---------- examples/imapresource/imapserverproxy.cpp | 12 +- examples/imapresource/imapserverproxy.h | 2 +- 3 files changed, 202 insertions(+), 105 deletions(-) (limited to 'examples') diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp index 4482a54..f07a62e 100644 --- a/examples/imapresource/imapresource.cpp +++ b/examples/imapresource/imapresource.cpp @@ -140,6 +140,24 @@ public: } } + static void setFlags(Sink::ApplicationDomain::Mail &mail, const KIMAP2::MessageFlags &flags) + { + mail.setUnread(!flags.contains(Imap::Flags::Seen)); + mail.setImportant(flags.contains(Imap::Flags::Flagged)); + } + + KIMAP2::MessageFlags getFlags(const Sink::ApplicationDomain::Mail &mail) + { + KIMAP2::MessageFlags flags; + if (!mail.getUnread()) { + flags << Imap::Flags::Seen; + } + if (mail.getImportant()) { + flags << Imap::Flags::Flagged; + } + return flags; + } + void synchronizeMails(const QByteArray &folderRid, const Message &message) { auto time = QSharedPointer::create(); @@ -158,8 +176,7 @@ public: auto mail = Sink::ApplicationDomain::Mail::create(mResourceInstanceIdentifier); mail.setFolder(folderLocalId); mail.setMimeMessage(message.msg->encodedContent()); - mail.setUnread(!message.flags.contains(Imap::Flags::Seen)); - mail.setImportant(message.flags.contains(Imap::Flags::Flagged)); + setFlags(mail, message.flags); createOrModify(bufferType, remoteId, mail); // const auto elapsed = time->elapsed(); @@ -198,76 +215,87 @@ public: SinkLog() << "Removed " << count << " mails in " << folderRid << Sink::Log::TraceTime(elapsed) << " " << elapsed/qMax(count, 1) << " [ms/mail]"; } - KAsync::Job synchronizeFolder(QSharedPointer imap, const Imap::Folder &folder, const QDate &dateFilter) + KAsync::Job synchronizeFolder(QSharedPointer imap, const Imap::Folder &folder, const QDate &dateFilter, bool fetchHeaderAlso = false) { - QSet uids; SinkLogCtx(mLogCtx) << "Synchronizing mails: " << folderRid(folder); - if (folder.path().isEmpty()) { + SinkLogCtx(mLogCtx) << " fetching headers also: " << fetchHeaderAlso; + const auto folderRemoteId = folderRid(folder); + if (folder.path().isEmpty() || folderRemoteId.isEmpty()) { + SinkWarningCtx(mLogCtx) << "Invalid folder " << folderRemoteId << folder.path(); return KAsync::error("Invalid folder"); } - auto capabilities = imap->getCapabilities(); - bool canDoIncrementalRemovals = false; + // auto capabilities = imap->getCapabilities(); + + //First we fetch flag changes for all messages. Since we don't know which messages are locally available we just get everything and only apply to what we have. return KAsync::start([=]() { - //First we fetch flag changes for all messages. Since we don't know which messages are locally available we just get everything and only apply to what we have. - auto uidNext = syncStore().readValue(folderRid(folder), "uidnext").toLongLong(); + auto uidNext = syncStore().readValue(folderRemoteId, "uidnext").toLongLong(); bool ok = false; - const auto changedsince = syncStore().readValue(folderRid(folder), "changedsince").toLongLong(&ok); - SinkLog() << "About to update flags" << folder.path() << "changedsince: " << changedsince; + const auto changedsince = syncStore().readValue(folderRemoteId, "changedsince").toLongLong(&ok); + SinkLogCtx(mLogCtx) << "About to update flags" << folder.path() << "changedsince: " << changedsince; + //If we have any mails so far we start off by updating any changed flags using changedsince if (ok) { - return imap->fetchFlags(folder, KIMAP2::ImapSet(1, qMax(uidNext, qint64(1))), changedsince, [this, folder](const Message &message) { - const auto folderLocalId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, folderRid(folder)); + return imap->fetchFlags(folder, KIMAP2::ImapSet(1, qMax(uidNext, qint64(1))), changedsince, [=](const Message &message) { + const auto folderLocalId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, folderRemoteId); const auto remoteId = assembleMailRid(folderLocalId, message.uid); - SinkLog() << "Updating mail flags " << remoteId << message.flags; + SinkLogCtx(mLogCtx) << "Updating mail flags " << remoteId << message.flags; auto mail = Sink::ApplicationDomain::Mail::create(mResourceInstanceIdentifier); - mail.setUnread(!message.flags.contains(Imap::Flags::Seen)); - mail.setImportant(message.flags.contains(Imap::Flags::Flagged)); + setFlags(mail, message.flags); modify(ENTITY_TYPE_MAIL, remoteId, mail); }) - .syncThen([this, folder](const SelectResult &selectResult) { - SinkLog() << "Flags updated. New changedsince value: " << selectResult.highestModSequence; - syncStore().writeValue(folderRid(folder), "changedsince", QByteArray::number(selectResult.highestModSequence)); + .syncThen([=](const SelectResult &selectResult) { + SinkLogCtx(mLogCtx) << "Flags updated. New changedsince value: " << selectResult.highestModSequence; + syncStore().writeValue(folderRemoteId, "changedsince", QByteArray::number(selectResult.highestModSequence)); }); } else { - //We hit this path on initial sync + //We hit this path on initial sync and simply record the current changedsince value return imap->select(imap->mailboxFromFolder(folder)) - .syncThen([this, folder](const SelectResult &selectResult) { - SinkLog() << "No flags to update. New changedsince value: " << selectResult.highestModSequence; - syncStore().writeValue(folderRid(folder), "changedsince", QByteArray::number(selectResult.highestModSequence)); + .syncThen([=](const SelectResult &selectResult) { + SinkLogCtx(mLogCtx) << "No flags to update. New changedsince value: " << selectResult.highestModSequence; + syncStore().writeValue(folderRemoteId, "changedsince", QByteArray::number(selectResult.highestModSequence)); }); } }) + //Next we synchronize the full set that is given by the date limit. + //We fetch all data for this set. + //This will also pull in any new messages in subsequent runs. .then([=]() { auto job = [&] { + SinkLogCtx(mLogCtx) << "Fetching messages since: " << dateFilter; if (dateFilter.isValid()) { return imap->fetchUidsSince(imap->mailboxFromFolder(folder), dateFilter); } else { return imap->fetchUids(imap->mailboxFromFolder(folder)); } }(); - return job.then>([this, folder, imap](const QVector &uidsToFetch) { - SinkTrace() << "Received result set " << uidsToFetch; - SinkTrace() << "About to fetch mail" << folder.path(); - const auto uidNext = syncStore().readValue(folderRid(folder), "uidnext").toLongLong(); + return job.then>([=](const QVector &uidsToFetch) { + SinkTraceCtx(mLogCtx) << "Received result set " << uidsToFetch; + SinkTraceCtx(mLogCtx) << "About to fetch mail" << folder.path(); + const auto uidNext = syncStore().readValue(folderRemoteId, "uidnext").toLongLong(); + + //Make sure the uids are sorted in reverse order and drop everything below uidNext (so we don't refetch what we already have QVector filteredAndSorted = uidsToFetch; qSort(filteredAndSorted.begin(), filteredAndSorted.end(), qGreater()); auto lowerBound = qLowerBound(filteredAndSorted.begin(), filteredAndSorted.end(), uidNext, qGreater()); if (lowerBound != filteredAndSorted.end()) { filteredAndSorted.erase(lowerBound, filteredAndSorted.end()); } + const qint64 lowerBoundUid = filteredAndSorted.isEmpty() ? 0 : filteredAndSorted.last(); auto maxUid = QSharedPointer::create(0); if (!filteredAndSorted.isEmpty()) { *maxUid = filteredAndSorted.first(); } - SinkTrace() << "Uids to fetch: " << filteredAndSorted; - return imap->fetchMessages(folder, filteredAndSorted, [this, folder, maxUid](const Message &m) { + SinkTraceCtx(mLogCtx) << "Uids to fetch: " << filteredAndSorted; + + bool headersOnly = false; + return imap->fetchMessages(folder, filteredAndSorted, headersOnly, [=](const Message &m) { if (*maxUid < m.uid) { *maxUid = m.uid; } - synchronizeMails(folderRid(folder), m); + synchronizeMails(folderRemoteId, m); }, [this, maxUid, folder](int progress, int total) { SinkLog() << "Progress: " << progress << " out of " << total; @@ -276,30 +304,68 @@ public: commit(); } }) - .syncThen([this, maxUid, folder]() { - SinkLog() << "UIDMAX: " << *maxUid << folder.path(); + .syncThen([=]() { + SinkLogCtx(mLogCtx) << "UIDMAX: " << *maxUid << folder.path(); if (*maxUid > 0) { - syncStore().writeValue(folderRid(folder) + "uidnext", QByteArray::number(*maxUid)); + syncStore().writeValue(folderRemoteId, "uidnext", QByteArray::number(*maxUid)); } + syncStore().writeValue(folderRemoteId, "fullsetLowerbound", QByteArray::number(lowerBoundUid)); commit(); }); }); }) - .then([=]() { - //TODO Remove what's no longer existing - if (canDoIncrementalRemovals) { - //TODO do an examine with QRESYNC and remove VANISHED messages - } else { - return imap->fetchUids(folder).syncThen>([this, folder](const QVector &uids) { - SinkTrace() << "Syncing removals: " << folder.path(); - synchronizeRemovals(folderRid(folder), uids.toList().toSet()); + .then([=] { + bool ok = false; + const auto headersFetched = !syncStore().readValue(folderRemoteId, "headersFetched").isEmpty(); + const auto fullsetLowerbound = syncStore().readValue(folderRemoteId, "fullsetLowerbound").toLongLong(&ok); + if (ok && !headersFetched) { + SinkLogCtx(mLogCtx) << "Fetching headers until: " << fullsetLowerbound; + + return imap->fetchUids(imap->mailboxFromFolder(folder)) + .then>([=] (const QVector &uids) { + //sort in reverse order and remove everything greater than fullsetLowerbound + QVector toFetch = uids; + qSort(toFetch.begin(), toFetch.end(), qGreater()); + if (fullsetLowerbound) { + auto upperBound = qUpperBound(toFetch.begin(), toFetch.end(), fullsetLowerbound, qGreater()); + if (upperBound != toFetch.begin()) { + toFetch.erase(toFetch.begin(), upperBound); + } + } + SinkLogCtx(mLogCtx) << "Fetching headers for: " << toFetch; + + bool headersOnly = true; + return imap->fetchMessages(folder, toFetch, headersOnly, [=](const Message &m) { + synchronizeMails(folderRemoteId, m); + }, + [=](int progress, int total) { + SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; + //commit every 100 messages + if ((progress % 100) == 0) { + commit(); + } + }); + }) + .syncThen([=]() { + SinkLogCtx(mLogCtx) << "Headers fetched: " << folder.path(); + syncStore().writeValue(folderRemoteId, "headersFetched", "true"); commit(); }); + + } else { + SinkLogCtx(mLogCtx) << "No additional headers to fetch."; } - return KAsync::null(); + return KAsync::null(); + }) + //Finally remove messages that are no longer existing on the server. + .then([=]() { + //TODO do an examine with QRESYNC and remove VANISHED messages if supported instead + return imap->fetchUids(folder).syncThen>([=](const QVector &uids) { + SinkTraceCtx(mLogCtx) << "Syncing removals: " << folder.path(); + synchronizeRemovals(folderRemoteId, uids.toList().toSet()); + commit(); + }); }); - - } Sink::QueryBase applyMailDefaults(const Sink::QueryBase &query) @@ -337,6 +403,33 @@ public: }); } + KAsync::Job> getFolderList(QSharedPointer imap, const Sink::QueryBase &query) + { + if (query.hasFilter()) { + //If we have a folder filter fetch full payload of date-range & all headers + QVector folders; + auto folderFilter = query.getFilter(); + auto localIds = resolveFilter(folderFilter); + auto folderRemoteIds = syncStore().resolveLocalIds(ApplicationDomain::getTypeName(), localIds); + for (const auto &r : folderRemoteIds) { + folders << Folder{r}; + } + return KAsync::value(folders); + } else { + //Otherwise fetch full payload for daterange + auto folderList = QSharedPointer>::create(); + return imap->fetchFolders([folderList](const Folder &folder) { + if (!folder.noselect) { + *folderList << folder; + } + }) + .onError([](const KAsync::Error &error) { + SinkWarning() << "Folder list sync failed."; + }) + .syncThen>([folderList]() { return *folderList; } ); + } + } + KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE { if (query.type() == ApplicationDomain::getTypeName()) { @@ -368,51 +461,63 @@ public: //* apply the date filter to the fetch //if we have no folder filter: //* fetch list of folders from server directly and sync (because we have no guarantee that the folder sync was already processed by the pipeline). - return KAsync::start([this, query]() { - auto imap = QSharedPointer::create(mServer, mPort); - auto job = login(imap); - job = job.then>([this, imap, query]() { - SinkLog() << "Login was successful"; - //FIXME If we were able to to flush in between we could just query the local store for the folder list. - // - if (query.hasFilter()) { - QVector folders; - auto folderFilter = query.getFilter(); - auto localIds = resolveFilter(folderFilter); - auto folderRemoteIds = syncStore().resolveLocalIds(ApplicationDomain::getTypeName(), localIds); - for (const auto &r : folderRemoteIds) { - folders << Folder{r}; + auto imap = QSharedPointer::create(mServer, mPort); + return login(imap) + .then([=] () -> KAsync::Job { + if (!query.ids().isEmpty()) { + //If we have mail id's simply fetch the full payload of those mails + QVector toFetch; + auto mailRemoteIds = syncStore().resolveLocalIds(ApplicationDomain::getTypeName(), query.ids()); + QByteArray folderRemoteId; + for (const auto &r : mailRemoteIds) { + const auto f = folderIdFromMailRid(r); + if (folderRemoteId.isEmpty()) { + folderRemoteId = f; + } else { + if (folderRemoteId != f) { + SinkWarningCtx(mLogCtx) << "Not all messages come from the same folder " << r << folderRemoteId; + } } - return KAsync::value(folders); - } else { - auto folderList = QSharedPointer>::create(); - return imap->fetchFolders([folderList](const Folder &folder) { - *folderList << folder; - }) - .onError([](const KAsync::Error &error) { - SinkWarning() << "Folder list sync failed."; - }) - .syncThen>([this, folderList]() { - return *folderList; - }); - } - }) - .serialEach([this, imap, query](const Folder &folder) { - if (folder.noselect) { - return KAsync::null(); + toFetch << uidFromMailRid(r); } - QDate dateFilter; - auto filter = query.getFilter(); - if (filter.value.canConvert()) { - dateFilter = filter.value.value(); - } - return synchronizeFolder(imap, folder, dateFilter) - .onError([folder](const KAsync::Error &error) { - SinkWarning() << "Failed to sync folder: " << folder.path() << "Error: " << error.errorMessage; + SinkLog() << "Fetching messages: " << toFetch; + bool headersOnly = false; + return imap->fetchMessages(Folder{folderRemoteId}, toFetch, headersOnly, [=](const Message &m) { + synchronizeMails(folderRemoteId, m); + }, + [=](int progress, int total) { + SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; + //commit every 100 messages + if ((progress % 100) == 0) { + commit(); + } + }) + .syncThen([=]() { + commit(); + }); + } else { + //Otherwise we sync the folder(s) + bool syncHeaders = query.hasFilter(); + //FIXME If we were able to to flush in between we could just query the local store for the folder list. + return getFolderList(imap, query) + .then>([=] (const QVector &folders) { + //Synchronize folders + return KAsync::value(folders) + .serialEach([=](const Folder &folder) { + SinkLog() << "Syncing folder " << folder.path(); + QDate dateFilter; + auto filter = query.getFilter(); + if (filter.value.canConvert()) { + dateFilter = filter.value.value(); + SinkLog() << " with date-range " << dateFilter; + } + return synchronizeFolder(imap, folder, dateFilter, syncHeaders) + .onError([folder](const KAsync::Error &error) { + SinkWarning() << "Failed to sync folder: " << folder.path() << "Error: " << error.errorMessage; + }); }); - }); - - return job; + }); + } }); } return KAsync::error("Nothing to do"); @@ -425,13 +530,7 @@ public: if (operation == Sink::Operation_Creation) { QString mailbox = syncStore().resolveLocalId(ENTITY_TYPE_FOLDER, mail.getFolder()); QByteArray content = KMime::LFtoCRLF(mail.getMimeMessage()); - QByteArrayList flags; - if (!mail.getUnread()) { - flags << Imap::Flags::Seen; - } - if (mail.getImportant()) { - flags << Imap::Flags::Flagged; - } + auto flags = getFlags(mail); QDateTime internalDate = mail.getDate(); return login.then(imap->append(mailbox, content, flags, internalDate)) .addToContext(imap) @@ -458,13 +557,7 @@ public: SinkTrace() << "Modifying a mail: " << oldRemoteId << " in the mailbox: " << mailbox << changedProperties; - QByteArrayList flags; - if (!mail.getUnread()) { - flags << Imap::Flags::Seen; - } - if (mail.getImportant()) { - flags << Imap::Flags::Flagged; - } + auto flags = getFlags(mail); const bool messageMoved = changedProperties.contains(ApplicationDomain::Mail::Folder::name); const bool messageChanged = changedProperties.contains(ApplicationDomain::Mail::MimeMessage::name); diff --git a/examples/imapresource/imapserverproxy.cpp b/examples/imapresource/imapserverproxy.cpp index af1f4d1..a172c93 100644 --- a/examples/imapresource/imapserverproxy.cpp +++ b/examples/imapresource/imapserverproxy.cpp @@ -441,18 +441,18 @@ KAsync::Job ImapServerProxy::fetchMessages(const Folder &folder, qint64 ui SinkTrace() << " Total: " << uidsToFetch.size(); SinkTrace() << " Uids to fetch: " << uidsToFetch; SinkTrace() << " Took: " << Sink::Log::TraceTime(time->elapsed()); - return fetchMessages(folder, uidsToFetch, callback, progress); + return fetchMessages(folder, uidsToFetch, false, callback, progress); }); }); } -KAsync::Job ImapServerProxy::fetchMessages(const Folder &folder, const QVector &uidsToFetch, std::function callback, std::function progress) +KAsync::Job ImapServerProxy::fetchMessages(const Folder &folder, const QVector &uidsToFetch, bool headersOnly, std::function callback, std::function progress) { auto time = QSharedPointer::create(); time->start(); Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); - return select(mailboxFromFolder(folder)).then([this, callback, folder, time, progress, uidsToFetch](const SelectResult &selectResult) -> KAsync::Job { + return select(mailboxFromFolder(folder)).then([this, callback, folder, time, progress, uidsToFetch, headersOnly](const SelectResult &selectResult) -> KAsync::Job { SinkTrace() << "Fetching messages" << folder.path(); SinkTrace() << " Total: " << uidsToFetch.size(); @@ -467,7 +467,11 @@ KAsync::Job ImapServerProxy::fetchMessages(const Folder &folder, const QVe } KIMAP2::FetchJob::FetchScope scope; scope.parts.clear(); - scope.mode = KIMAP2::FetchJob::FetchScope::Full; + if (headersOnly) { + scope.mode = KIMAP2::FetchJob::FetchScope::Headers; + } else { + scope.mode = KIMAP2::FetchJob::FetchScope::Full; + } KIMAP2::ImapSet set; set.add(uidsToFetch); diff --git a/examples/imapresource/imapserverproxy.h b/examples/imapresource/imapserverproxy.h index fa2d022..140c5b2 100644 --- a/examples/imapresource/imapserverproxy.h +++ b/examples/imapresource/imapserverproxy.h @@ -163,7 +163,7 @@ public: KAsync::Job fetchFolders(std::function callback); KAsync::Job fetchMessages(const Folder &folder, std::function callback, std::function progress = std::function()); KAsync::Job fetchMessages(const Folder &folder, qint64 uidNext, std::function callback, std::function progress = std::function()); - KAsync::Job fetchMessages(const Folder &folder, const QVector &uidsToFetch, std::function callback, std::function progress); + KAsync::Job fetchMessages(const Folder &folder, const QVector &uidsToFetch, bool headersOnly, std::function callback, std::function progress); KAsync::Job fetchFlags(const Folder &folder, const KIMAP2::ImapSet &set, qint64 changedsince, std::function callback); KAsync::Job> fetchUids(const Folder &folder); -- cgit v1.2.3