From 47b9f2109f57c1121b760ea6d885ab08f12c46b3 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 26 Sep 2016 14:19:44 +0200 Subject: Blooming --- common/datastorequery.cpp | 37 +++++++++++++++++++++++++++++++++++++ common/query.h | 5 +++-- tests/mailthreadtest.cpp | 28 +++++++++++++++++++++++++--- 3 files changed, 65 insertions(+), 5 deletions(-) diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 7c0fdea..f352b74 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -214,6 +214,39 @@ public: } }; +class Bloom : public FilterBase { +public: + typedef QSharedPointer Ptr; + + QByteArray mBloomProperty; + + Bloom(const QByteArray &bloomProperty, FilterBase::Ptr source, DataStoreQuery *store) + : FilterBase(source, store), + mBloomProperty(bloomProperty) + { + + } + + virtual ~Bloom(){} + + bool next(const std::function &callback) Q_DECL_OVERRIDE { + bool foundValue = false; + while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { + auto bloomValue = getProperty(entityBuffer.entity(), mBloomProperty); + auto results = indexLookup(mBloomProperty, bloomValue); + for (const auto r : results) { + readEntity(r, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { + callback(Sink::Operation_Creation, uid, entityBuffer); + foundValue = true; + }); + } + return false; + })) + {} + return foundValue; + } +}; + DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function getProperty) : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) { @@ -383,6 +416,10 @@ void DataStoreQuery::setupQuery() auto reduce = Reduce::Ptr::create("threadId", "date", Reduce::Max, baseSet, this); baseSet = reduce; } + if (mQuery.bloomThread) { + auto reduce = Bloom::Ptr::create("threadId", baseSet, this); + baseSet = reduce; + } mCollector = Collector::Ptr::create(baseSet, this); } diff --git a/common/query.h b/common/query.h index 0808432..1b909e5 100644 --- a/common/query.h +++ b/common/query.h @@ -195,13 +195,13 @@ public: return *this; } - Query(const ApplicationDomain::Entity &value) : limit(0), liveQuery(false), synchronousQuery(false), threadLeaderOnly(false) + Query(const ApplicationDomain::Entity &value) : limit(0), liveQuery(false), synchronousQuery(false), threadLeaderOnly(false), bloomThread(false) { ids << value.identifier(); resources << value.resourceInstanceIdentifier(); } - Query(Flags flags = Flags()) : limit(0), liveQuery(false), synchronousQuery(false), threadLeaderOnly(false) + Query(Flags flags = Flags()) : limit(0), liveQuery(false), synchronousQuery(false), threadLeaderOnly(false), bloomThread(false) { } @@ -237,6 +237,7 @@ public: bool liveQuery; bool synchronousQuery; bool threadLeaderOnly; + bool bloomThread; }; } diff --git a/tests/mailthreadtest.cpp b/tests/mailthreadtest.cpp index a3df56b..89e5a85 100644 --- a/tests/mailthreadtest.cpp +++ b/tests/mailthreadtest.cpp @@ -133,6 +133,21 @@ void MailThreadTest::testIndexInMixedOrder() query.sort(); query.filter(folder); + Mail threadLeader; + + //Ensure we find the thread leader + { + auto job = Store::fetchAll(query) + .syncThen>([=, &threadLeader](const QList &mails) { + QCOMPARE(mails.size(), 1); + auto mail = *mails.first(); + threadLeader = mail; + QCOMPARE(mail.getSubject(), QString::fromLatin1("Re: Re: 1")); + }); + VERIFYEXEC(job); + } + + //Ensure we find the thread leader still { auto job = Store::fetchAll(query) .syncThen>([=](const QList &mails) { @@ -149,17 +164,24 @@ void MailThreadTest::testIndexInMixedOrder() mail.setFolder(folder); VERIFYEXEC(Store::create(mail)); } - VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); + + //Ensure the thread is complete { + Sink::Query query; + query.resources << mResourceInstanceIdentifier; + query.request().request().request().request(); + query.bloomThread = true; + query.sort(); + query.ids << threadLeader.identifier(); + auto job = Store::fetchAll(query) .syncThen>([=](const QList &mails) { - QCOMPARE(mails.size(), 1); + QCOMPARE(mails.size(), 2); auto mail = *mails.first(); QCOMPARE(mail.getSubject(), QString::fromLatin1("Re: Re: 1")); }); VERIFYEXEC(job); - //TODO ensure we also find message 1 as part of thread. } /* VERIFYEXEC(Store::remove(mail)); */ -- cgit v1.2.3