summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-29 11:27:04 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-29 11:27:04 +0100
commit6a072b2dcf23cbcdb210f2bd5c273ea0f425b188 (patch)
treecc2789c59e04018743aa0d575ee51a6f10869ffc /common
parent81b459c0f013704e95fb5933525c82a6ca46f13f (diff)
downloadsink-6a072b2dcf23cbcdb210f2bd5c273ea0f425b188.tar.gz
sink-6a072b2dcf23cbcdb210f2bd5c273ea0f425b188.zip
The synchronization call can be sync.
... because we really just enqueue the request and then wait for the notification.
Diffstat (limited to 'common')
-rw-r--r--common/commandprocessor.cpp48
-rw-r--r--common/commandprocessor.h1
-rw-r--r--common/genericresource.cpp3
-rw-r--r--common/notifier.cpp5
-rw-r--r--common/notifier.h1
-rw-r--r--common/synchronizer.cpp43
-rw-r--r--common/synchronizer.h13
7 files changed, 56 insertions, 58 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index fccff22..8eb0ef1 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -128,17 +128,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data)
128 QDataStream stream(&data, QIODevice::ReadOnly); 128 QDataStream stream(&data, QIODevice::ReadOnly);
129 stream >> query; 129 stream >> query;
130 } 130 }
131 synchronizeWithSource(query) 131 mSynchronizer->synchronize(query);
132 .then<void>([timer](const KAsync::Error &error) {
133 if (error) {
134 SinkWarning() << "Sync failed: " << error.errorMessage;
135 return KAsync::error(error);
136 } else {
137 SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
138 return KAsync::null();
139 }
140 })
141 .exec();
142 } else { 132 } else {
143 SinkWarning() << "received invalid command"; 133 SinkWarning() << "received invalid command";
144 } 134 }
@@ -156,34 +146,6 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data)
156// loadResource().setLowerBoundRevision(lowerBoundRevision()); 146// loadResource().setLowerBoundRevision(lowerBoundRevision());
157// } 147// }
158 148
159KAsync::Job<void> CommandProcessor::synchronizeWithSource(const Sink::QueryBase &query)
160{
161 return KAsync::start<void>([this, query] {
162 Sink::Notification n;
163 n.id = "sync";
164 n.type = Sink::Notification::Status;
165 n.message = "Synchronization has started.";
166 n.code = Sink::ApplicationDomain::BusyStatus;
167 emit notify(n);
168
169 SinkLog() << " Synchronizing";
170 return mSynchronizer->synchronize(query)
171 .then<void>([this](const KAsync::Error &error) {
172 if (!error) {
173 SinkLog() << "Done Synchronizing";
174 Sink::Notification n;
175 n.id = "sync";
176 n.type = Sink::Notification::Status;
177 n.message = "Synchronization has ended.";
178 n.code = Sink::ApplicationDomain::ConnectedStatus;
179 emit notify(n);
180 return KAsync::null();
181 }
182 return KAsync::error(error);
183 });
184 });
185}
186
187void CommandProcessor::setOldestUsedRevision(qint64 revision) 149void CommandProcessor::setOldestUsedRevision(qint64 revision)
188{ 150{
189 mLowerBoundRevision = revision; 151 mLowerBoundRevision = revision;
@@ -337,17 +299,17 @@ void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synch
337 QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { 299 QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() {
338 Sink::Notification n; 300 Sink::Notification n;
339 n.id = "changereplay"; 301 n.id = "changereplay";
340 n.type = Sink::Notification::Status; 302 n.type = Notification::Status;
341 n.message = "Replaying changes."; 303 n.message = "Replaying changes.";
342 n.code = Sink::ApplicationDomain::BusyStatus; 304 n.code = ApplicationDomain::BusyStatus;
343 emit notify(n); 305 emit notify(n);
344 }); 306 });
345 QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { 307 QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() {
346 Sink::Notification n; 308 Sink::Notification n;
347 n.id = "changereplay"; 309 n.id = "changereplay";
348 n.type = Sink::Notification::Status; 310 n.type = Notification::Status;
349 n.message = "All changes have been replayed."; 311 n.message = "All changes have been replayed.";
350 n.code = Sink::ApplicationDomain::ConnectedStatus; 312 n.code = ApplicationDomain::ConnectedStatus;
351 emit notify(n); 313 emit notify(n);
352 }); 314 });
353 315
diff --git a/common/commandprocessor.h b/common/commandprocessor.h
index a807f46..81f93e5 100644
--- a/common/commandprocessor.h
+++ b/common/commandprocessor.h
@@ -78,7 +78,6 @@ private:
78 // void processRevisionReplayedCommand(const QByteArray &data); 78 // void processRevisionReplayedCommand(const QByteArray &data);
79 79
80 KAsync::Job<void> flush(void const *command, size_t size); 80 KAsync::Job<void> flush(void const *command, size_t size);
81 KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query);
82 81
83 Sink::Pipeline *mPipeline; 82 Sink::Pipeline *mPipeline;
84 MessageQueue mUserQueue; 83 MessageQueue mUserQueue;
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 5819a07..c11e899 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -100,7 +100,8 @@ void GenericResource::processCommand(int commandId, const QByteArray &data)
100 100
101KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) 101KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query)
102{ 102{
103 return mSynchronizer->synchronize(query); 103 mSynchronizer->synchronize(query);
104 return KAsync::null<void>();
104} 105}
105 106
106KAsync::Job<void> GenericResource::processAllMessages() 107KAsync::Job<void> GenericResource::processAllMessages()
diff --git a/common/notifier.cpp b/common/notifier.cpp
index 94ac84e..53db5be 100644
--- a/common/notifier.cpp
+++ b/common/notifier.cpp
@@ -23,6 +23,7 @@
23#include <functional> 23#include <functional>
24 24
25#include "resourceaccess.h" 25#include "resourceaccess.h"
26#include "resourceconfig.h"
26#include "log.h" 27#include "log.h"
27 28
28using namespace Sink; 29using namespace Sink;
@@ -60,6 +61,10 @@ Notifier::Notifier(const QByteArray &instanceIdentifier, const QByteArray &resou
60 d->resourceAccess << resourceAccess; 61 d->resourceAccess << resourceAccess;
61} 62}
62 63
64Notifier::Notifier(const QByteArray &instanceIdentifier) : Notifier(instanceIdentifier, ResourceConfig::getResourceType(instanceIdentifier))
65{
66}
67
63void Notifier::registerHandler(std::function<void(const Notification &)> handler) 68void Notifier::registerHandler(std::function<void(const Notification &)> handler)
64{ 69{
65 d->handler << handler; 70 d->handler << handler;
diff --git a/common/notifier.h b/common/notifier.h
index 3d61e95..df8f34b 100644
--- a/common/notifier.h
+++ b/common/notifier.h
@@ -36,6 +36,7 @@ class SINK_EXPORT Notifier
36{ 36{
37public: 37public:
38 Notifier(const QSharedPointer<ResourceAccess> &resourceAccess); 38 Notifier(const QSharedPointer<ResourceAccess> &resourceAccess);
39 Notifier(const QByteArray &resourceInstanceIdentifier);
39 Notifier(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType); 40 Notifier(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType);
40 void registerHandler(std::function<void(const Notification &)>); 41 void registerHandler(std::function<void(const Notification &)>);
41 42
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 11c7caf..6483cdf 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -38,7 +38,8 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context)
38 : ChangeReplay(context), 38 : ChangeReplay(context),
39 mResourceContext(context), 39 mResourceContext(context),
40 mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), 40 mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)),
41 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) 41 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite),
42 mSyncInProgress(false)
42{ 43{
43 SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); 44 SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
44} 45}
@@ -254,15 +255,15 @@ void Synchronizer::modify(const DomainType &entity)
254QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query) 255QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query)
255{ 256{
256 QList<Synchronizer::SyncRequest> list; 257 QList<Synchronizer::SyncRequest> list;
257 list << Synchronizer::SyncRequest{query}; 258 list << Synchronizer::SyncRequest{query, "sync"};
258 return list; 259 return list;
259} 260}
260 261
261KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) 262void Synchronizer::synchronize(const Sink::QueryBase &query)
262{ 263{
263 SinkTrace() << "Synchronizing"; 264 SinkTrace() << "Synchronizing";
264 mSyncRequestQueue << getSyncRequests(query); 265 mSyncRequestQueue << getSyncRequests(query);
265 return processSyncQueue(); 266 processSyncQueue().exec();
266} 267}
267 268
268void Synchronizer::flush(int commandId, const QByteArray &flushId) 269void Synchronizer::flush(int commandId, const QByteArray &flushId)
@@ -284,20 +285,48 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
284 while (!mSyncRequestQueue.isEmpty()) { 285 while (!mSyncRequestQueue.isEmpty()) {
285 auto request = mSyncRequestQueue.takeFirst(); 286 auto request = mSyncRequestQueue.takeFirst();
286 if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 287 if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
287 job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] { 288 job = job.syncThen<void>([this, request] {
289 Sink::Notification n;
290 n.id = request.requestId;
291 n.type = Notification::Status;
292 n.message = "Synchronization has started.";
293 n.code = ApplicationDomain::BusyStatus;
294 emit notify(n);
295 }).then(synchronizeWithSource(request.query)).syncThen<void>([this] {
288 //Commit after every request, so implementations only have to commit more if they add a lot of data. 296 //Commit after every request, so implementations only have to commit more if they add a lot of data.
289 commit(); 297 commit();
298 }).then<void>([this, request](const KAsync::Error &error) {
299 if (error) {
300 //Emit notification with error
301 SinkWarning() << "Synchronization failed: " << error.errorMessage;
302 Sink::Notification n;
303 n.id = request.requestId;
304 n.type = Notification::Status;
305 n.message = "Synchronization has ended.";
306 n.code = ApplicationDomain::ErrorStatus;
307 emit notify(n);
308 return KAsync::error(error);
309 } else {
310 SinkLog() << "Done Synchronizing";
311 Sink::Notification n;
312 n.id = request.requestId;
313 n.type = Notification::Status;
314 n.message = "Synchronization has ended.";
315 n.code = ApplicationDomain::ConnectedStatus;
316 emit notify(n);
317 return KAsync::null();
318 }
290 }); 319 });
291 } else if (request.requestType == Synchronizer::SyncRequest::Flush) { 320 } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
292 if (request.flushType == Flush::FlushReplayQueue) { 321 if (request.flushType == Flush::FlushReplayQueue) {
293 SinkTrace() << "Emitting flush completion."; 322 SinkTrace() << "Emitting flush completion.";
294 Sink::Notification n; 323 Sink::Notification n;
295 n.type = Sink::Notification::FlushCompletion; 324 n.type = Sink::Notification::FlushCompletion;
296 n.id = request.flushId; 325 n.id = request.requestId;
297 emit notify(n); 326 emit notify(n);
298 } else { 327 } else {
299 flatbuffers::FlatBufferBuilder fbb; 328 flatbuffers::FlatBufferBuilder fbb;
300 auto flushId = fbb.CreateString(request.flushId); 329 auto flushId = fbb.CreateString(request.requestId);
301 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); 330 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
302 Sink::Commands::FinishFlushBuffer(fbb, location); 331 Sink::Commands::FinishFlushBuffer(fbb, location);
303 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); 332 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 989f902..f9b834e 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -44,7 +44,7 @@ public:
44 virtual ~Synchronizer(); 44 virtual ~Synchronizer();
45 45
46 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue); 46 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue);
47 KAsync::Job<void> synchronize(const Sink::QueryBase &query); 47 void synchronize(const Sink::QueryBase &query);
48 void flush(int commandId, const QByteArray &flushId); 48 void flush(int commandId, const QByteArray &flushId);
49 49
50 //Read only access to main storage 50 //Read only access to main storage
@@ -123,8 +123,9 @@ protected:
123 Flush 123 Flush
124 }; 124 };
125 125
126 SyncRequest(const Sink::QueryBase &q) 126 SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray())
127 : requestType(Synchronization), 127 : requestId(requestId_),
128 requestType(Synchronization),
128 query(q) 129 query(q)
129 { 130 {
130 } 131 }
@@ -134,15 +135,15 @@ protected:
134 { 135 {
135 } 136 }
136 137
137 SyncRequest(RequestType type, int flushType_, const QByteArray &flushId_) 138 SyncRequest(RequestType type, int flushType_, const QByteArray &requestId_)
138 : flushType(flushType_), 139 : flushType(flushType_),
139 flushId(flushId_), 140 requestId(requestId_),
140 requestType(type) 141 requestType(type)
141 { 142 {
142 } 143 }
143 144
144 int flushType = 0; 145 int flushType = 0;
145 QByteArray flushId; 146 QByteArray requestId;
146 RequestType requestType; 147 RequestType requestType;
147 Sink::QueryBase query; 148 Sink::QueryBase query;
148 }; 149 };