diff options
-rw-r--r-- | common/datastorequery.cpp | 41 | ||||
-rw-r--r-- | common/datastorequery.h | 1 | ||||
-rw-r--r-- | common/queryrunner.cpp | 6 | ||||
-rw-r--r-- | tests/querytest.cpp | 27 |
4 files changed, 61 insertions, 14 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 2e0c348..4c95606 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -43,6 +43,8 @@ class Source : public FilterBase { | |||
43 | 43 | ||
44 | QVector<QByteArray> mIds; | 44 | QVector<QByteArray> mIds; |
45 | QVector<QByteArray>::ConstIterator mIt; | 45 | QVector<QByteArray>::ConstIterator mIt; |
46 | QVector<QByteArray> mIncrementalIds; | ||
47 | QVector<QByteArray>::ConstIterator mIncrementalIt; | ||
46 | 48 | ||
47 | Source (const QVector<QByteArray> &ids, DataStoreQuery *store) | 49 | Source (const QVector<QByteArray> &ids, DataStoreQuery *store) |
48 | : FilterBase(store), | 50 | : FilterBase(store), |
@@ -63,21 +65,36 @@ class Source : public FilterBase { | |||
63 | 65 | ||
64 | void add(const QVector<QByteArray> &ids) | 66 | void add(const QVector<QByteArray> &ids) |
65 | { | 67 | { |
66 | mIds = ids; | 68 | mIncrementalIds = ids; |
67 | mIt = mIds.constBegin(); | 69 | mIncrementalIt = mIncrementalIds.constBegin(); |
68 | } | 70 | } |
69 | 71 | ||
70 | bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE | 72 | bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE |
71 | { | 73 | { |
72 | if (mIt == mIds.constEnd()) { | 74 | if (!mIncrementalIds.isEmpty()) { |
73 | return false; | 75 | if (mIncrementalIt == mIncrementalIds.constEnd()) { |
76 | return false; | ||
77 | } | ||
78 | readEntity(*mIncrementalIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | ||
79 | SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); | ||
80 | callback({entity, operation}); | ||
81 | }); | ||
82 | mIncrementalIt++; | ||
83 | if (mIncrementalIt == mIncrementalIds.constEnd()) { | ||
84 | return false; | ||
85 | } | ||
86 | return true; | ||
87 | } else { | ||
88 | if (mIt == mIds.constEnd()) { | ||
89 | return false; | ||
90 | } | ||
91 | readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | ||
92 | SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); | ||
93 | callback({entity, operation}); | ||
94 | }); | ||
95 | mIt++; | ||
96 | return mIt != mIds.constEnd(); | ||
74 | } | 97 | } |
75 | readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | ||
76 | SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); | ||
77 | callback({entity, operation}); | ||
78 | }); | ||
79 | mIt++; | ||
80 | return mIt != mIds.constEnd(); | ||
81 | } | 98 | } |
82 | }; | 99 | }; |
83 | 100 | ||
@@ -599,6 +616,10 @@ ResultSet DataStoreQuery::update(qint64 baseRevision) | |||
599 | return ResultSet(generator, [this]() { mCollector->skip(); }); | 616 | return ResultSet(generator, [this]() { mCollector->skip(); }); |
600 | } | 617 | } |
601 | 618 | ||
619 | void DataStoreQuery::updateComplete() | ||
620 | { | ||
621 | mSource->mIncrementalIds.clear(); | ||
622 | } | ||
602 | 623 | ||
603 | ResultSet DataStoreQuery::execute() | 624 | ResultSet DataStoreQuery::execute() |
604 | { | 625 | { |
diff --git a/common/datastorequery.h b/common/datastorequery.h index ee5f99e..de4ae26 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h | |||
@@ -50,6 +50,7 @@ public: | |||
50 | ~DataStoreQuery(); | 50 | ~DataStoreQuery(); |
51 | ResultSet execute(); | 51 | ResultSet execute(); |
52 | ResultSet update(qint64 baseRevision); | 52 | ResultSet update(qint64 baseRevision); |
53 | void updateComplete(); | ||
53 | 54 | ||
54 | State::Ptr getState(); | 55 | State::Ptr getState(); |
55 | 56 | ||
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 43f48c0..f196965 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -231,11 +231,11 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
231 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { | 231 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { |
232 | resultProviderCallback(query, resultProvider, result); | 232 | resultProviderCallback(query, resultProvider, result); |
233 | }); | 233 | }); |
234 | 234 | preparedQuery.updateComplete(); | |
235 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" | 235 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" |
236 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 236 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
237 | << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 237 | << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
238 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; | 238 | return {entityStore.maxRevision(), replayResult.replayedEntities, false, preparedQuery.getState()}; |
239 | } | 239 | } |
240 | 240 | ||
241 | template <class DomainType> | 241 | template <class DomainType> |
@@ -264,7 +264,7 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
264 | return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 264 | return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; |
265 | } | 265 | } |
266 | }(); | 266 | }(); |
267 | auto resultSet = preparedQuery.execute();; | 267 | auto resultSet = preparedQuery.execute(); |
268 | 268 | ||
269 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed()); | 269 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed()); |
270 | auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { | 270 | auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { |
diff --git a/tests/querytest.cpp b/tests/querytest.cpp index 4ff1be8..714e549 100644 --- a/tests/querytest.cpp +++ b/tests/querytest.cpp | |||
@@ -381,6 +381,7 @@ private slots: | |||
381 | { | 381 | { |
382 | // Setup | 382 | // Setup |
383 | Folder::Ptr folderEntity; | 383 | Folder::Ptr folderEntity; |
384 | const auto date = QDateTime(QDate(2015, 7, 7), QTime(12, 0)); | ||
384 | { | 385 | { |
385 | Folder folder("sink.dummy.instance1"); | 386 | Folder folder("sink.dummy.instance1"); |
386 | Sink::Store::create<Folder>(folder).exec().waitForFinished(); | 387 | Sink::Store::create<Folder>(folder).exec().waitForFinished(); |
@@ -398,7 +399,6 @@ private slots: | |||
398 | folderEntity = model->index(0, 0).data(Sink::Store::DomainObjectRole).value<Folder::Ptr>(); | 399 | folderEntity = model->index(0, 0).data(Sink::Store::DomainObjectRole).value<Folder::Ptr>(); |
399 | QVERIFY(!folderEntity->identifier().isEmpty()); | 400 | QVERIFY(!folderEntity->identifier().isEmpty()); |
400 | 401 | ||
401 | const auto date = QDateTime(QDate(2015, 7, 7), QTime(12, 0)); | ||
402 | { | 402 | { |
403 | Mail mail("sink.dummy.instance1"); | 403 | Mail mail("sink.dummy.instance1"); |
404 | mail.setExtractedMessageId("testSecond"); | 404 | mail.setExtractedMessageId("testSecond"); |
@@ -428,6 +428,11 @@ private slots: | |||
428 | query.filter<Mail::Folder>(*folderEntity); | 428 | query.filter<Mail::Folder>(*folderEntity); |
429 | query.sort<Mail::Date>(); | 429 | query.sort<Mail::Date>(); |
430 | query.limit(1); | 430 | query.limit(1); |
431 | query.setFlags(Query::LiveQuery); | ||
432 | query.reduce<ApplicationDomain::Mail::ThreadId>(Query::Reduce::Selector::max<ApplicationDomain::Mail::Date>()) | ||
433 | .count("count") | ||
434 | .collect<ApplicationDomain::Mail::Unread>("unreadCollected") | ||
435 | .collect<ApplicationDomain::Mail::Important>("importantCollected"); | ||
431 | 436 | ||
432 | // Ensure all local data is processed | 437 | // Ensure all local data is processed |
433 | VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); | 438 | VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); |
@@ -443,6 +448,26 @@ private slots: | |||
443 | QCOMPARE(model->rowCount(), 2); | 448 | QCOMPARE(model->rowCount(), 2); |
444 | // We can't make any assumptions about the order of the indexes | 449 | // We can't make any assumptions about the order of the indexes |
445 | // QCOMPARE(model->index(1, 0).data(Sink::Store::DomainObjectRole).value<Mail::Ptr>()->getProperty("messageId").toByteArray(), QByteArray("testSecond")); | 450 | // QCOMPARE(model->index(1, 0).data(Sink::Store::DomainObjectRole).value<Mail::Ptr>()->getProperty("messageId").toByteArray(), QByteArray("testSecond")); |
451 | |||
452 | //New revisions always go through | ||
453 | { | ||
454 | Mail mail("sink.dummy.instance1"); | ||
455 | mail.setExtractedMessageId("testInjected"); | ||
456 | mail.setFolder(folderEntity->identifier()); | ||
457 | mail.setExtractedDate(date.addDays(-2)); | ||
458 | Sink::Store::create<Mail>(mail).exec().waitForFinished(); | ||
459 | } | ||
460 | QTRY_COMPARE(model->rowCount(), 3); | ||
461 | |||
462 | //Ensure we can continue fetching after the incremental update | ||
463 | model->fetchMore(QModelIndex()); | ||
464 | QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); | ||
465 | QCOMPARE(model->rowCount(), 4); | ||
466 | |||
467 | //Ensure we have fetched all | ||
468 | model->fetchMore(QModelIndex()); | ||
469 | QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); | ||
470 | QCOMPARE(model->rowCount(), 4); | ||
446 | } | 471 | } |
447 | 472 | ||
448 | void testReactToNewResource() | 473 | void testReactToNewResource() |