summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-06-10 09:37:08 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-06-10 09:37:08 +0200
commite382924f1a90b5a27eba2e8c5981f6a4fe7892c9 (patch)
tree315780c4699be92d20121351bca4154c4fff8994
parentf254153ac1571d5ab31a4f17fba2db9256bc24d3 (diff)
downloadsink-e382924f1a90b5a27eba2e8c5981f6a4fe7892c9.tar.gz
sink-e382924f1a90b5a27eba2e8c5981f6a4fe7892c9.zip
Fixed incremental queries
The incremental querying broke as soon as a revision update came in since it would nuke the base-set. This fixes it, but it's definitely not pretty.
-rw-r--r--common/datastorequery.cpp41
-rw-r--r--common/datastorequery.h1
-rw-r--r--common/queryrunner.cpp6
-rw-r--r--tests/querytest.cpp27
4 files changed, 61 insertions, 14 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 2e0c348..4c95606 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -43,6 +43,8 @@ class Source : public FilterBase {
43 43
44 QVector<QByteArray> mIds; 44 QVector<QByteArray> mIds;
45 QVector<QByteArray>::ConstIterator mIt; 45 QVector<QByteArray>::ConstIterator mIt;
46 QVector<QByteArray> mIncrementalIds;
47 QVector<QByteArray>::ConstIterator mIncrementalIt;
46 48
47 Source (const QVector<QByteArray> &ids, DataStoreQuery *store) 49 Source (const QVector<QByteArray> &ids, DataStoreQuery *store)
48 : FilterBase(store), 50 : FilterBase(store),
@@ -63,21 +65,36 @@ class Source : public FilterBase {
63 65
64 void add(const QVector<QByteArray> &ids) 66 void add(const QVector<QByteArray> &ids)
65 { 67 {
66 mIds = ids; 68 mIncrementalIds = ids;
67 mIt = mIds.constBegin(); 69 mIncrementalIt = mIncrementalIds.constBegin();
68 } 70 }
69 71
70 bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE 72 bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE
71 { 73 {
72 if (mIt == mIds.constEnd()) { 74 if (!mIncrementalIds.isEmpty()) {
73 return false; 75 if (mIncrementalIt == mIncrementalIds.constEnd()) {
76 return false;
77 }
78 readEntity(*mIncrementalIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
79 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation);
80 callback({entity, operation});
81 });
82 mIncrementalIt++;
83 if (mIncrementalIt == mIncrementalIds.constEnd()) {
84 return false;
85 }
86 return true;
87 } else {
88 if (mIt == mIds.constEnd()) {
89 return false;
90 }
91 readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
92 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation);
93 callback({entity, operation});
94 });
95 mIt++;
96 return mIt != mIds.constEnd();
74 } 97 }
75 readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
76 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation);
77 callback({entity, operation});
78 });
79 mIt++;
80 return mIt != mIds.constEnd();
81 } 98 }
82}; 99};
83 100
@@ -599,6 +616,10 @@ ResultSet DataStoreQuery::update(qint64 baseRevision)
599 return ResultSet(generator, [this]() { mCollector->skip(); }); 616 return ResultSet(generator, [this]() { mCollector->skip(); });
600} 617}
601 618
619void DataStoreQuery::updateComplete()
620{
621 mSource->mIncrementalIds.clear();
622}
602 623
603ResultSet DataStoreQuery::execute() 624ResultSet DataStoreQuery::execute()
604{ 625{
diff --git a/common/datastorequery.h b/common/datastorequery.h
index ee5f99e..de4ae26 100644
--- a/common/datastorequery.h
+++ b/common/datastorequery.h
@@ -50,6 +50,7 @@ public:
50 ~DataStoreQuery(); 50 ~DataStoreQuery();
51 ResultSet execute(); 51 ResultSet execute();
52 ResultSet update(qint64 baseRevision); 52 ResultSet update(qint64 baseRevision);
53 void updateComplete();
53 54
54 State::Ptr getState(); 55 State::Ptr getState();
55 56
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 43f48c0..f196965 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -231,11 +231,11 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
231 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 231 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
232 resultProviderCallback(query, resultProvider, result); 232 resultProviderCallback(query, resultProvider, result);
233 }); 233 });
234 234 preparedQuery.updateComplete();
235 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" 235 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n"
236 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 236 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
237 << "Incremental query took: " << Log::TraceTime(time.elapsed()); 237 << "Incremental query took: " << Log::TraceTime(time.elapsed());
238 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; 238 return {entityStore.maxRevision(), replayResult.replayedEntities, false, preparedQuery.getState()};
239} 239}
240 240
241template <class DomainType> 241template <class DomainType>
@@ -264,7 +264,7 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
264 return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 264 return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore};
265 } 265 }
266 }(); 266 }();
267 auto resultSet = preparedQuery.execute();; 267 auto resultSet = preparedQuery.execute();
268 268
269 SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed()); 269 SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed());
270 auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { 270 auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) {
diff --git a/tests/querytest.cpp b/tests/querytest.cpp
index 4ff1be8..714e549 100644
--- a/tests/querytest.cpp
+++ b/tests/querytest.cpp
@@ -381,6 +381,7 @@ private slots:
381 { 381 {
382 // Setup 382 // Setup
383 Folder::Ptr folderEntity; 383 Folder::Ptr folderEntity;
384 const auto date = QDateTime(QDate(2015, 7, 7), QTime(12, 0));
384 { 385 {
385 Folder folder("sink.dummy.instance1"); 386 Folder folder("sink.dummy.instance1");
386 Sink::Store::create<Folder>(folder).exec().waitForFinished(); 387 Sink::Store::create<Folder>(folder).exec().waitForFinished();
@@ -398,7 +399,6 @@ private slots:
398 folderEntity = model->index(0, 0).data(Sink::Store::DomainObjectRole).value<Folder::Ptr>(); 399 folderEntity = model->index(0, 0).data(Sink::Store::DomainObjectRole).value<Folder::Ptr>();
399 QVERIFY(!folderEntity->identifier().isEmpty()); 400 QVERIFY(!folderEntity->identifier().isEmpty());
400 401
401 const auto date = QDateTime(QDate(2015, 7, 7), QTime(12, 0));
402 { 402 {
403 Mail mail("sink.dummy.instance1"); 403 Mail mail("sink.dummy.instance1");
404 mail.setExtractedMessageId("testSecond"); 404 mail.setExtractedMessageId("testSecond");
@@ -428,6 +428,11 @@ private slots:
428 query.filter<Mail::Folder>(*folderEntity); 428 query.filter<Mail::Folder>(*folderEntity);
429 query.sort<Mail::Date>(); 429 query.sort<Mail::Date>();
430 query.limit(1); 430 query.limit(1);
431 query.setFlags(Query::LiveQuery);
432 query.reduce<ApplicationDomain::Mail::ThreadId>(Query::Reduce::Selector::max<ApplicationDomain::Mail::Date>())
433 .count("count")
434 .collect<ApplicationDomain::Mail::Unread>("unreadCollected")
435 .collect<ApplicationDomain::Mail::Important>("importantCollected");
431 436
432 // Ensure all local data is processed 437 // Ensure all local data is processed
433 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); 438 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1"));
@@ -443,6 +448,26 @@ private slots:
443 QCOMPARE(model->rowCount(), 2); 448 QCOMPARE(model->rowCount(), 2);
444 // We can't make any assumptions about the order of the indexes 449 // We can't make any assumptions about the order of the indexes
445 // QCOMPARE(model->index(1, 0).data(Sink::Store::DomainObjectRole).value<Mail::Ptr>()->getProperty("messageId").toByteArray(), QByteArray("testSecond")); 450 // QCOMPARE(model->index(1, 0).data(Sink::Store::DomainObjectRole).value<Mail::Ptr>()->getProperty("messageId").toByteArray(), QByteArray("testSecond"));
451
452 //New revisions always go through
453 {
454 Mail mail("sink.dummy.instance1");
455 mail.setExtractedMessageId("testInjected");
456 mail.setFolder(folderEntity->identifier());
457 mail.setExtractedDate(date.addDays(-2));
458 Sink::Store::create<Mail>(mail).exec().waitForFinished();
459 }
460 QTRY_COMPARE(model->rowCount(), 3);
461
462 //Ensure we can continue fetching after the incremental update
463 model->fetchMore(QModelIndex());
464 QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool());
465 QCOMPARE(model->rowCount(), 4);
466
467 //Ensure we have fetched all
468 model->fetchMore(QModelIndex());
469 QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool());
470 QCOMPARE(model->rowCount(), 4);
446 } 471 }
447 472
448 void testReactToNewResource() 473 void testReactToNewResource()