summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2018-02-22 10:16:18 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2018-02-22 10:16:18 +0100
commit2beb09e8b3f4922a15ec0abde737d4cf9b9d4f8b (patch)
tree50115abf9b041d66ab9e4242af52cd16a7a9a3db
parent280b1250c0a038c2cf09fae3848ed0adefecc430 (diff)
downloadsink-2beb09e8b3f4922a15ec0abde737d4cf9b9d4f8b.tar.gz
sink-2beb09e8b3f4922a15ec0abde737d4cf9b9d4f8b.zip
Deal with removals in reduced queries
-rw-r--r--common/datastorequery.cpp22
-rw-r--r--common/datastorequery.h7
-rw-r--r--common/storage/entitystore.cpp2
-rw-r--r--tests/querytest.cpp44
4 files changed, 69 insertions, 6 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 218796f..dacc8ab 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -300,11 +300,18 @@ public:
300 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { 300 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE {
301 bool foundValue = false; 301 bool foundValue = false;
302 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { 302 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
303 if (result.operation == Sink::Operation_Removal) { 303 const auto reductionValue = [&] {
304 callback(result); 304 if (result.operation == Sink::Operation_Removal) {
305 return false; 305 //For removals we have to read the last revision to get a value, and thus be able to find the correct thread.
306 } 306 QVariant reductionValue;
307 auto reductionValue = result.entity.getProperty(mReductionProperty); 307 readPrevious(result.entity.identifier(), [&] (const ApplicationDomain::ApplicationDomainType &prev) {
308 reductionValue = prev.getProperty(mReductionProperty);
309 });
310 return reductionValue;
311 } else {
312 return result.entity.getProperty(mReductionProperty);
313 }
314 }();
308 const auto &reductionValueBa = getByteArray(reductionValue); 315 const auto &reductionValueBa = getByteArray(reductionValue);
309 if (!mReducedValues.contains(reductionValueBa)) { 316 if (!mReducedValues.contains(reductionValueBa)) {
310 //Only reduce every value once. 317 //Only reduce every value once.
@@ -439,6 +446,11 @@ void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &res
439 mStore.readLatest(mType, key, resultCallback); 446 mStore.readLatest(mType, key, resultCallback);
440} 447}
441 448
449void DataStoreQuery::readPrevious(const QByteArray &key, const std::function<void (const ApplicationDomain::ApplicationDomainType &)> &callback)
450{
451 mStore.readPrevious(mType, key, mStore.maxRevision(), callback);
452}
453
442QVector<QByteArray> DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) 454QVector<QByteArray> DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value)
443{ 455{
444 return mStore.indexLookup(mType, property, value); 456 return mStore.indexLookup(mType, property, value);
diff --git a/common/datastorequery.h b/common/datastorequery.h
index 8800644..2311585 100644
--- a/common/datastorequery.h
+++ b/common/datastorequery.h
@@ -62,6 +62,7 @@ private:
62 QVector<QByteArray> indexLookup(const QByteArray &property, const QVariant &value); 62 QVector<QByteArray> indexLookup(const QByteArray &property, const QVariant &value);
63 63
64 void readEntity(const QByteArray &key, const BufferCallback &resultCallback); 64 void readEntity(const QByteArray &key, const BufferCallback &resultCallback);
65 void readPrevious(const QByteArray &key, const std::function<void (const Sink::ApplicationDomain::ApplicationDomainType &)> &callback);
65 66
66 ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &); 67 ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &);
67 QVector<QByteArray> loadIncrementalResultSet(qint64 baseRevision); 68 QVector<QByteArray> loadIncrementalResultSet(qint64 baseRevision);
@@ -107,6 +108,12 @@ public:
107 return mDatastore->indexLookup(property, value); 108 return mDatastore->indexLookup(property, value);
108 } 109 }
109 110
111 void readPrevious(const QByteArray &key, const std::function<void (const Sink::ApplicationDomain::ApplicationDomainType &)> &callback)
112 {
113 Q_ASSERT(mDatastore);
114 mDatastore->readPrevious(key, callback);
115 }
116
110 virtual void skip() { mSource->skip(); } 117 virtual void skip() { mSource->skip(); }
111 118
112 //Returns true for as long as a result is available 119 //Returns true for as long as a result is available
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index 020f3fd..7da7efa 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -563,7 +563,7 @@ void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qi
563 return true; 563 return true;
564 }, 564 },
565 [&](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }, true); 565 [&](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }, true);
566 return readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback); 566 readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback);
567} 567}
568 568
569void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback) 569void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback)
diff --git a/tests/querytest.cpp b/tests/querytest.cpp
index ec6438d..f65d477 100644
--- a/tests/querytest.cpp
+++ b/tests/querytest.cpp
@@ -760,6 +760,50 @@ private slots:
760 QTRY_COMPARE(model->rowCount(), 0); 760 QTRY_COMPARE(model->rowCount(), 0);
761 } 761 }
762 762
763 void testLivequeryRemoveOneInThread()
764 {
765 // Setup
766 auto folder1 = Folder::createEntity<Folder>("sink.dummy.instance1");
767 VERIFYEXEC(Sink::Store::create<Folder>(folder1));
768
769 auto mail1 = Mail::createEntity<Mail>("sink.dummy.instance1");
770 mail1.setExtractedMessageId("mail1");
771 mail1.setFolder(folder1);
772 VERIFYEXEC(Sink::Store::create(mail1));
773 auto mail2 = Mail::createEntity<Mail>("sink.dummy.instance1");
774 mail2.setExtractedMessageId("mail2");
775 mail2.setFolder(folder1);
776 VERIFYEXEC(Sink::Store::create(mail2));
777 // Ensure all local data is processed
778 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1"));
779
780 //Setup two folders with a mail each, ensure we only get the mail from the folder that matches the folder filter.
781 Query query;
782 query.setId("testLivequeryUnmatch");
783 query.reduce<Mail::Folder>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Sender>("senders");
784 query.sort<Mail::Date>();
785 query.setFlags(Query::LiveQuery);
786 auto model = Sink::Store::loadModel<Mail>(query);
787 QTRY_COMPARE(model->rowCount(), 1);
788 QCOMPARE(model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>()->getProperty("count").toInt(), 2);
789
790 //After the removal, the thread size should be reduced by one
791 {
792
793 VERIFYEXEC(Sink::Store::remove(mail1));
794 }
795 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1"));
796 QTRY_COMPARE(model->rowCount(), 1);
797 QTRY_COMPARE(model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>()->getProperty("count").toInt(), 1);
798
799 //After the second removal, the thread should be gone
800 {
801 VERIFYEXEC(Sink::Store::remove(mail2));
802 }
803 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1"));
804 QTRY_COMPARE(model->rowCount(), 0);
805 }
806
763 void testDontUpdateNonLiveQuery() 807 void testDontUpdateNonLiveQuery()
764 { 808 {
765 // Setup 809 // Setup