diff options
-rw-r--r-- | common/commandprocessor.cpp | 48 | ||||
-rw-r--r-- | common/commandprocessor.h | 1 | ||||
-rw-r--r-- | common/genericresource.cpp | 3 | ||||
-rw-r--r-- | common/notifier.cpp | 5 | ||||
-rw-r--r-- | common/notifier.h | 1 | ||||
-rw-r--r-- | common/synchronizer.cpp | 43 | ||||
-rw-r--r-- | common/synchronizer.h | 13 | ||||
-rw-r--r-- | tests/mailsynctest.cpp | 19 |
8 files changed, 72 insertions, 61 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index fccff22..8eb0ef1 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp | |||
@@ -128,17 +128,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data) | |||
128 | QDataStream stream(&data, QIODevice::ReadOnly); | 128 | QDataStream stream(&data, QIODevice::ReadOnly); |
129 | stream >> query; | 129 | stream >> query; |
130 | } | 130 | } |
131 | synchronizeWithSource(query) | 131 | mSynchronizer->synchronize(query); |
132 | .then<void>([timer](const KAsync::Error &error) { | ||
133 | if (error) { | ||
134 | SinkWarning() << "Sync failed: " << error.errorMessage; | ||
135 | return KAsync::error(error); | ||
136 | } else { | ||
137 | SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); | ||
138 | return KAsync::null(); | ||
139 | } | ||
140 | }) | ||
141 | .exec(); | ||
142 | } else { | 132 | } else { |
143 | SinkWarning() << "received invalid command"; | 133 | SinkWarning() << "received invalid command"; |
144 | } | 134 | } |
@@ -156,34 +146,6 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data) | |||
156 | // loadResource().setLowerBoundRevision(lowerBoundRevision()); | 146 | // loadResource().setLowerBoundRevision(lowerBoundRevision()); |
157 | // } | 147 | // } |
158 | 148 | ||
159 | KAsync::Job<void> CommandProcessor::synchronizeWithSource(const Sink::QueryBase &query) | ||
160 | { | ||
161 | return KAsync::start<void>([this, query] { | ||
162 | Sink::Notification n; | ||
163 | n.id = "sync"; | ||
164 | n.type = Sink::Notification::Status; | ||
165 | n.message = "Synchronization has started."; | ||
166 | n.code = Sink::ApplicationDomain::BusyStatus; | ||
167 | emit notify(n); | ||
168 | |||
169 | SinkLog() << " Synchronizing"; | ||
170 | return mSynchronizer->synchronize(query) | ||
171 | .then<void>([this](const KAsync::Error &error) { | ||
172 | if (!error) { | ||
173 | SinkLog() << "Done Synchronizing"; | ||
174 | Sink::Notification n; | ||
175 | n.id = "sync"; | ||
176 | n.type = Sink::Notification::Status; | ||
177 | n.message = "Synchronization has ended."; | ||
178 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
179 | emit notify(n); | ||
180 | return KAsync::null(); | ||
181 | } | ||
182 | return KAsync::error(error); | ||
183 | }); | ||
184 | }); | ||
185 | } | ||
186 | |||
187 | void CommandProcessor::setOldestUsedRevision(qint64 revision) | 149 | void CommandProcessor::setOldestUsedRevision(qint64 revision) |
188 | { | 150 | { |
189 | mLowerBoundRevision = revision; | 151 | mLowerBoundRevision = revision; |
@@ -337,17 +299,17 @@ void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synch | |||
337 | QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { | 299 | QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { |
338 | Sink::Notification n; | 300 | Sink::Notification n; |
339 | n.id = "changereplay"; | 301 | n.id = "changereplay"; |
340 | n.type = Sink::Notification::Status; | 302 | n.type = Notification::Status; |
341 | n.message = "Replaying changes."; | 303 | n.message = "Replaying changes."; |
342 | n.code = Sink::ApplicationDomain::BusyStatus; | 304 | n.code = ApplicationDomain::BusyStatus; |
343 | emit notify(n); | 305 | emit notify(n); |
344 | }); | 306 | }); |
345 | QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { | 307 | QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { |
346 | Sink::Notification n; | 308 | Sink::Notification n; |
347 | n.id = "changereplay"; | 309 | n.id = "changereplay"; |
348 | n.type = Sink::Notification::Status; | 310 | n.type = Notification::Status; |
349 | n.message = "All changes have been replayed."; | 311 | n.message = "All changes have been replayed."; |
350 | n.code = Sink::ApplicationDomain::ConnectedStatus; | 312 | n.code = ApplicationDomain::ConnectedStatus; |
351 | emit notify(n); | 313 | emit notify(n); |
352 | }); | 314 | }); |
353 | 315 | ||
diff --git a/common/commandprocessor.h b/common/commandprocessor.h index a807f46..81f93e5 100644 --- a/common/commandprocessor.h +++ b/common/commandprocessor.h | |||
@@ -78,7 +78,6 @@ private: | |||
78 | // void processRevisionReplayedCommand(const QByteArray &data); | 78 | // void processRevisionReplayedCommand(const QByteArray &data); |
79 | 79 | ||
80 | KAsync::Job<void> flush(void const *command, size_t size); | 80 | KAsync::Job<void> flush(void const *command, size_t size); |
81 | KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query); | ||
82 | 81 | ||
83 | Sink::Pipeline *mPipeline; | 82 | Sink::Pipeline *mPipeline; |
84 | MessageQueue mUserQueue; | 83 | MessageQueue mUserQueue; |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 5819a07..c11e899 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -100,7 +100,8 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) | |||
100 | 100 | ||
101 | KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) | 101 | KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) |
102 | { | 102 | { |
103 | return mSynchronizer->synchronize(query); | 103 | mSynchronizer->synchronize(query); |
104 | return KAsync::null<void>(); | ||
104 | } | 105 | } |
105 | 106 | ||
106 | KAsync::Job<void> GenericResource::processAllMessages() | 107 | KAsync::Job<void> GenericResource::processAllMessages() |
diff --git a/common/notifier.cpp b/common/notifier.cpp index 94ac84e..53db5be 100644 --- a/common/notifier.cpp +++ b/common/notifier.cpp | |||
@@ -23,6 +23,7 @@ | |||
23 | #include <functional> | 23 | #include <functional> |
24 | 24 | ||
25 | #include "resourceaccess.h" | 25 | #include "resourceaccess.h" |
26 | #include "resourceconfig.h" | ||
26 | #include "log.h" | 27 | #include "log.h" |
27 | 28 | ||
28 | using namespace Sink; | 29 | using namespace Sink; |
@@ -60,6 +61,10 @@ Notifier::Notifier(const QByteArray &instanceIdentifier, const QByteArray &resou | |||
60 | d->resourceAccess << resourceAccess; | 61 | d->resourceAccess << resourceAccess; |
61 | } | 62 | } |
62 | 63 | ||
64 | Notifier::Notifier(const QByteArray &instanceIdentifier) : Notifier(instanceIdentifier, ResourceConfig::getResourceType(instanceIdentifier)) | ||
65 | { | ||
66 | } | ||
67 | |||
63 | void Notifier::registerHandler(std::function<void(const Notification &)> handler) | 68 | void Notifier::registerHandler(std::function<void(const Notification &)> handler) |
64 | { | 69 | { |
65 | d->handler << handler; | 70 | d->handler << handler; |
diff --git a/common/notifier.h b/common/notifier.h index 3d61e95..df8f34b 100644 --- a/common/notifier.h +++ b/common/notifier.h | |||
@@ -36,6 +36,7 @@ class SINK_EXPORT Notifier | |||
36 | { | 36 | { |
37 | public: | 37 | public: |
38 | Notifier(const QSharedPointer<ResourceAccess> &resourceAccess); | 38 | Notifier(const QSharedPointer<ResourceAccess> &resourceAccess); |
39 | Notifier(const QByteArray &resourceInstanceIdentifier); | ||
39 | Notifier(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType); | 40 | Notifier(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType); |
40 | void registerHandler(std::function<void(const Notification &)>); | 41 | void registerHandler(std::function<void(const Notification &)>); |
41 | 42 | ||
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 11c7caf..6483cdf 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -38,7 +38,8 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context) | |||
38 | : ChangeReplay(context), | 38 | : ChangeReplay(context), |
39 | mResourceContext(context), | 39 | mResourceContext(context), |
40 | mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), | 40 | mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), |
41 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) | 41 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), |
42 | mSyncInProgress(false) | ||
42 | { | 43 | { |
43 | SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); | 44 | SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); |
44 | } | 45 | } |
@@ -254,15 +255,15 @@ void Synchronizer::modify(const DomainType &entity) | |||
254 | QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query) | 255 | QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query) |
255 | { | 256 | { |
256 | QList<Synchronizer::SyncRequest> list; | 257 | QList<Synchronizer::SyncRequest> list; |
257 | list << Synchronizer::SyncRequest{query}; | 258 | list << Synchronizer::SyncRequest{query, "sync"}; |
258 | return list; | 259 | return list; |
259 | } | 260 | } |
260 | 261 | ||
261 | KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) | 262 | void Synchronizer::synchronize(const Sink::QueryBase &query) |
262 | { | 263 | { |
263 | SinkTrace() << "Synchronizing"; | 264 | SinkTrace() << "Synchronizing"; |
264 | mSyncRequestQueue << getSyncRequests(query); | 265 | mSyncRequestQueue << getSyncRequests(query); |
265 | return processSyncQueue(); | 266 | processSyncQueue().exec(); |
266 | } | 267 | } |
267 | 268 | ||
268 | void Synchronizer::flush(int commandId, const QByteArray &flushId) | 269 | void Synchronizer::flush(int commandId, const QByteArray &flushId) |
@@ -284,20 +285,48 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
284 | while (!mSyncRequestQueue.isEmpty()) { | 285 | while (!mSyncRequestQueue.isEmpty()) { |
285 | auto request = mSyncRequestQueue.takeFirst(); | 286 | auto request = mSyncRequestQueue.takeFirst(); |
286 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | 287 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { |
287 | job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] { | 288 | job = job.syncThen<void>([this, request] { |
289 | Sink::Notification n; | ||
290 | n.id = request.requestId; | ||
291 | n.type = Notification::Status; | ||
292 | n.message = "Synchronization has started."; | ||
293 | n.code = ApplicationDomain::BusyStatus; | ||
294 | emit notify(n); | ||
295 | }).then(synchronizeWithSource(request.query)).syncThen<void>([this] { | ||
288 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 296 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
289 | commit(); | 297 | commit(); |
298 | }).then<void>([this, request](const KAsync::Error &error) { | ||
299 | if (error) { | ||
300 | //Emit notification with error | ||
301 | SinkWarning() << "Synchronization failed: " << error.errorMessage; | ||
302 | Sink::Notification n; | ||
303 | n.id = request.requestId; | ||
304 | n.type = Notification::Status; | ||
305 | n.message = "Synchronization has ended."; | ||
306 | n.code = ApplicationDomain::ErrorStatus; | ||
307 | emit notify(n); | ||
308 | return KAsync::error(error); | ||
309 | } else { | ||
310 | SinkLog() << "Done Synchronizing"; | ||
311 | Sink::Notification n; | ||
312 | n.id = request.requestId; | ||
313 | n.type = Notification::Status; | ||
314 | n.message = "Synchronization has ended."; | ||
315 | n.code = ApplicationDomain::ConnectedStatus; | ||
316 | emit notify(n); | ||
317 | return KAsync::null(); | ||
318 | } | ||
290 | }); | 319 | }); |
291 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { | 320 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { |
292 | if (request.flushType == Flush::FlushReplayQueue) { | 321 | if (request.flushType == Flush::FlushReplayQueue) { |
293 | SinkTrace() << "Emitting flush completion."; | 322 | SinkTrace() << "Emitting flush completion."; |
294 | Sink::Notification n; | 323 | Sink::Notification n; |
295 | n.type = Sink::Notification::FlushCompletion; | 324 | n.type = Sink::Notification::FlushCompletion; |
296 | n.id = request.flushId; | 325 | n.id = request.requestId; |
297 | emit notify(n); | 326 | emit notify(n); |
298 | } else { | 327 | } else { |
299 | flatbuffers::FlatBufferBuilder fbb; | 328 | flatbuffers::FlatBufferBuilder fbb; |
300 | auto flushId = fbb.CreateString(request.flushId); | 329 | auto flushId = fbb.CreateString(request.requestId); |
301 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | 330 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); |
302 | Sink::Commands::FinishFlushBuffer(fbb, location); | 331 | Sink::Commands::FinishFlushBuffer(fbb, location); |
303 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | 332 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); |
diff --git a/common/synchronizer.h b/common/synchronizer.h index 989f902..f9b834e 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h | |||
@@ -44,7 +44,7 @@ public: | |||
44 | virtual ~Synchronizer(); | 44 | virtual ~Synchronizer(); |
45 | 45 | ||
46 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue); | 46 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue); |
47 | KAsync::Job<void> synchronize(const Sink::QueryBase &query); | 47 | void synchronize(const Sink::QueryBase &query); |
48 | void flush(int commandId, const QByteArray &flushId); | 48 | void flush(int commandId, const QByteArray &flushId); |
49 | 49 | ||
50 | //Read only access to main storage | 50 | //Read only access to main storage |
@@ -123,8 +123,9 @@ protected: | |||
123 | Flush | 123 | Flush |
124 | }; | 124 | }; |
125 | 125 | ||
126 | SyncRequest(const Sink::QueryBase &q) | 126 | SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray()) |
127 | : requestType(Synchronization), | 127 | : requestId(requestId_), |
128 | requestType(Synchronization), | ||
128 | query(q) | 129 | query(q) |
129 | { | 130 | { |
130 | } | 131 | } |
@@ -134,15 +135,15 @@ protected: | |||
134 | { | 135 | { |
135 | } | 136 | } |
136 | 137 | ||
137 | SyncRequest(RequestType type, int flushType_, const QByteArray &flushId_) | 138 | SyncRequest(RequestType type, int flushType_, const QByteArray &requestId_) |
138 | : flushType(flushType_), | 139 | : flushType(flushType_), |
139 | flushId(flushId_), | 140 | requestId(requestId_), |
140 | requestType(type) | 141 | requestType(type) |
141 | { | 142 | { |
142 | } | 143 | } |
143 | 144 | ||
144 | int flushType = 0; | 145 | int flushType = 0; |
145 | QByteArray flushId; | 146 | QByteArray requestId; |
146 | RequestType requestType; | 147 | RequestType requestType; |
147 | Sink::QueryBase query; | 148 | Sink::QueryBase query; |
148 | }; | 149 | }; |
diff --git a/tests/mailsynctest.cpp b/tests/mailsynctest.cpp index 3927f14..72dbfd5 100644 --- a/tests/mailsynctest.cpp +++ b/tests/mailsynctest.cpp | |||
@@ -25,6 +25,8 @@ | |||
25 | 25 | ||
26 | #include "store.h" | 26 | #include "store.h" |
27 | #include "resourcecontrol.h" | 27 | #include "resourcecontrol.h" |
28 | #include "notifier.h" | ||
29 | #include "notification.h" | ||
28 | #include "log.h" | 30 | #include "log.h" |
29 | #include "test.h" | 31 | #include "test.h" |
30 | 32 | ||
@@ -411,10 +413,21 @@ void MailSyncTest::testFailingSync() | |||
411 | Sink::Query query; | 413 | Sink::Query query; |
412 | query.resourceFilter(resource.identifier()); | 414 | query.resourceFilter(resource.identifier()); |
413 | 415 | ||
416 | bool errorReceived = false; | ||
417 | |||
418 | //We currently don't get a proper error notification except for the status change | ||
419 | auto notifier = QSharedPointer<Sink::Notifier>::create(resource.identifier()); | ||
420 | notifier->registerHandler([&](const Notification ¬ification) { | ||
421 | SinkTrace() << "Received notification " << notification.type << notification.id; | ||
422 | if (notification.code == ApplicationDomain::ErrorStatus) { | ||
423 | errorReceived = true; | ||
424 | SinkWarning() << "Sync return an error"; | ||
425 | } | ||
426 | }); | ||
427 | |||
428 | VERIFYEXEC(Store::synchronize(query)); | ||
414 | // Ensure sync fails if resource is misconfigured | 429 | // Ensure sync fails if resource is misconfigured |
415 | auto future = Store::synchronize(query).exec(); | 430 | QTRY_VERIFY(errorReceived); |
416 | future.waitForFinished(); | ||
417 | QVERIFY(future.errorCode()); | ||
418 | } | 431 | } |
419 | 432 | ||
420 | #include "mailsynctest.moc" | 433 | #include "mailsynctest.moc" |