diff options
-rw-r--r-- | common/genericresource.cpp | 160 | ||||
-rw-r--r-- | common/genericresource.h | 44 | ||||
-rw-r--r-- | examples/maildirresource/maildirresource.cpp | 223 | ||||
-rw-r--r-- | examples/maildirresource/maildirresource.h | 35 |
4 files changed, 239 insertions, 223 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 9fbcaaa..42153ec 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -4,12 +4,16 @@ | |||
4 | #include "pipeline.h" | 4 | #include "pipeline.h" |
5 | #include "queuedcommand_generated.h" | 5 | #include "queuedcommand_generated.h" |
6 | #include "createentity_generated.h" | 6 | #include "createentity_generated.h" |
7 | #include "modifyentity_generated.h" | ||
8 | #include "deleteentity_generated.h" | ||
7 | #include "domainadaptor.h" | 9 | #include "domainadaptor.h" |
8 | #include "commands.h" | 10 | #include "commands.h" |
9 | #include "index.h" | 11 | #include "index.h" |
10 | #include "log.h" | 12 | #include "log.h" |
11 | #include "definitions.h" | 13 | #include "definitions.h" |
12 | 14 | ||
15 | #include <QUuid> | ||
16 | |||
13 | static int sBatchSize = 100; | 17 | static int sBatchSize = 100; |
14 | 18 | ||
15 | using namespace Akonadi2; | 19 | using namespace Akonadi2; |
@@ -52,6 +56,7 @@ public: | |||
52 | { | 56 | { |
53 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); | 57 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); |
54 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | 58 | const qint64 lastReplayedRevision = getLastReplayedRevision(); |
59 | Trace() << "All changes replayed " << topRevision << lastReplayedRevision; | ||
55 | return (lastReplayedRevision >= topRevision); | 60 | return (lastReplayedRevision >= topRevision); |
56 | } | 61 | } |
57 | 62 | ||
@@ -444,4 +449,159 @@ void GenericResource::setLowerBoundRevision(qint64 revision) | |||
444 | updateLowerBoundRevision(); | 449 | updateLowerBoundRevision(); |
445 | } | 450 | } |
446 | 451 | ||
452 | void GenericResource::createEntity(const QByteArray &akonadiId, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
453 | { | ||
454 | //These changes are coming from the source | ||
455 | const auto replayToSource = false; | ||
456 | flatbuffers::FlatBufferBuilder entityFbb; | ||
457 | adaptorFactory.createBuffer(domainObject, entityFbb); | ||
458 | flatbuffers::FlatBufferBuilder fbb; | ||
459 | //This is the resource type and not the domain type | ||
460 | auto entityId = fbb.CreateString(akonadiId.toStdString()); | ||
461 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
462 | auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
463 | auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); | ||
464 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); | ||
465 | callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | ||
466 | } | ||
467 | |||
468 | void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
469 | { | ||
470 | //These changes are coming from the source | ||
471 | const auto replayToSource = false; | ||
472 | flatbuffers::FlatBufferBuilder entityFbb; | ||
473 | adaptorFactory.createBuffer(domainObject, entityFbb); | ||
474 | flatbuffers::FlatBufferBuilder fbb; | ||
475 | auto entityId = fbb.CreateString(akonadiId.toStdString()); | ||
476 | //This is the resource type and not the domain type | ||
477 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
478 | auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
479 | //TODO removals | ||
480 | auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); | ||
481 | Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); | ||
482 | callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | ||
483 | } | ||
484 | |||
485 | void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | ||
486 | { | ||
487 | //These changes are coming from the source | ||
488 | const auto replayToSource = false; | ||
489 | flatbuffers::FlatBufferBuilder fbb; | ||
490 | auto entityId = fbb.CreateString(akonadiId.toStdString()); | ||
491 | //This is the resource type and not the domain type | ||
492 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
493 | auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); | ||
494 | Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); | ||
495 | callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | ||
496 | } | ||
497 | |||
498 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | ||
499 | { | ||
500 | Index index("rid.mapping." + bufferType, transaction); | ||
501 | Index localIndex("localid.mapping." + bufferType, transaction); | ||
502 | index.add(remoteId, localId); | ||
503 | localIndex.add(localId, remoteId); | ||
504 | } | ||
505 | |||
506 | void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | ||
507 | { | ||
508 | Index index("rid.mapping." + bufferType, transaction); | ||
509 | Index localIndex("localid.mapping." + bufferType, transaction); | ||
510 | index.remove(remoteId, localId); | ||
511 | localIndex.remove(localId, remoteId); | ||
512 | } | ||
513 | |||
514 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | ||
515 | { | ||
516 | //Lookup local id for remote id, or insert a new pair otherwise | ||
517 | Index index("rid.mapping." + bufferType, transaction); | ||
518 | Index localIndex("localid.mapping." + bufferType, transaction); | ||
519 | QByteArray akonadiId = index.lookup(remoteId); | ||
520 | if (akonadiId.isEmpty()) { | ||
521 | akonadiId = QUuid::createUuid().toString().toUtf8(); | ||
522 | index.add(remoteId, akonadiId); | ||
523 | localIndex.add(akonadiId, remoteId); | ||
524 | } | ||
525 | return akonadiId; | ||
526 | } | ||
527 | |||
528 | QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Akonadi2::Storage::Transaction &transaction) | ||
529 | { | ||
530 | Index index("localid.mapping." + bufferType, transaction); | ||
531 | QByteArray remoteId = index.lookup(localId); | ||
532 | if (remoteId.isEmpty()) { | ||
533 | Warning() << "Couldn't find the remote id for " << localId; | ||
534 | return QByteArray(); | ||
535 | } | ||
536 | return remoteId; | ||
537 | } | ||
538 | |||
539 | void GenericResource::scanForRemovals(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | ||
540 | { | ||
541 | entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { | ||
542 | auto akonadiId = Akonadi2::Storage::uidFromKey(key); | ||
543 | Trace() << "Checking for removal " << key; | ||
544 | const auto remoteId = resolveLocalId(bufferType, akonadiId, synchronizationTransaction); | ||
545 | //If we have no remoteId, the entity hasn't been replayed to the source yet | ||
546 | if (!remoteId.isEmpty()) { | ||
547 | if (!exists(remoteId)) { | ||
548 | Trace() << "Found a removed entity: " << akonadiId; | ||
549 | deleteEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) { | ||
550 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::DeleteEntityCommand, buffer); | ||
551 | }); | ||
552 | } | ||
553 | } | ||
554 | }); | ||
555 | } | ||
556 | |||
557 | static QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> getLatest(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | ||
558 | { | ||
559 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | ||
560 | db.findLatest(uid, [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
561 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
562 | if (!buffer.isValid()) { | ||
563 | Warning() << "Read invalid buffer from disk"; | ||
564 | } else { | ||
565 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
566 | } | ||
567 | return false; | ||
568 | }, | ||
569 | [](const Akonadi2::Storage::Error &error) { | ||
570 | Warning() << "Failed to read current value from storage: " << error.message; | ||
571 | }); | ||
572 | return current; | ||
573 | } | ||
574 | |||
575 | void GenericResource::createOrModify(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Akonadi2::ApplicationDomain::ApplicationDomainType &entity) | ||
576 | { | ||
577 | auto mainDatabase = transaction.openDatabase(bufferType + ".main"); | ||
578 | const auto akonadiId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); | ||
579 | const auto found = mainDatabase.contains(akonadiId); | ||
580 | if (!found) { | ||
581 | Trace() << "Found a new entity: " << remoteId; | ||
582 | createEntity(akonadiId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { | ||
583 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, buffer); | ||
584 | }); | ||
585 | } else { //modification | ||
586 | if (auto current = getLatest(mainDatabase, akonadiId, adaptorFactory)) { | ||
587 | bool changed = false; | ||
588 | for (const auto &property : entity.changedProperties()) { | ||
589 | if (entity.getProperty(property) != current->getProperty(property)) { | ||
590 | Trace() << "Property changed " << akonadiId << property; | ||
591 | changed = true; | ||
592 | } | ||
593 | } | ||
594 | if (changed) { | ||
595 | Trace() << "Found a modified entity: " << remoteId; | ||
596 | modifyEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { | ||
597 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::ModifyEntityCommand, buffer); | ||
598 | }); | ||
599 | } | ||
600 | } else { | ||
601 | Warning() << "Failed to get current entity"; | ||
602 | } | ||
603 | } | ||
604 | } | ||
605 | |||
606 | |||
447 | #include "genericresource.moc" | 607 | #include "genericresource.moc" |
diff --git a/common/genericresource.h b/common/genericresource.h index ea68a25..c12c631 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -63,6 +63,50 @@ protected: | |||
63 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value); | 63 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value); |
64 | void onProcessorError(int errorCode, const QString &errorMessage); | 64 | void onProcessorError(int errorCode, const QString &errorMessage); |
65 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | 65 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); |
66 | |||
67 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
68 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
69 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | ||
70 | |||
71 | /** | ||
72 | * Records a localId to remoteId mapping | ||
73 | */ | ||
74 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction); | ||
75 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction); | ||
76 | |||
77 | /** | ||
78 | * Tries to find a local id for the remote id, and creates a new local id otherwise. | ||
79 | * | ||
80 | * The new local id is recorded in the local to remote id mapping. | ||
81 | */ | ||
82 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction); | ||
83 | |||
84 | /** | ||
85 | * Tries to find a remote id for a local id. | ||
86 | * | ||
87 | * This can fail if the entity hasn't been written back to the server yet. | ||
88 | */ | ||
89 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Akonadi2::Storage::Transaction &transaction); | ||
90 | |||
91 | /** | ||
92 | * A synchronous algorithm to remove entities that are no longer existing. | ||
93 | * | ||
94 | * A list of entities is generated by @param entryGenerator. | ||
95 | * The entiry Generator typically iterates over an index to produce all existing entries. | ||
96 | * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false, | ||
97 | * an entity delete command is enqueued. | ||
98 | * | ||
99 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. | ||
100 | */ | ||
101 | void scanForRemovals(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); | ||
102 | |||
103 | /** | ||
104 | * An algorithm to create or modify the entity. | ||
105 | * | ||
106 | * Depending on whether the entity is locally available, or has changed. | ||
107 | */ | ||
108 | void createOrModify(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Akonadi2::ApplicationDomain::ApplicationDomainType &entity); | ||
109 | |||
66 | MessageQueue mUserQueue; | 110 | MessageQueue mUserQueue; |
67 | MessageQueue mSynchronizerQueue; | 111 | MessageQueue mSynchronizerQueue; |
68 | QByteArray mResourceInstanceIdentifier; | 112 | QByteArray mResourceInstanceIdentifier; |
diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp index 6c6c5aa..273b996 100644 --- a/examples/maildirresource/maildirresource.cpp +++ b/examples/maildirresource/maildirresource.cpp | |||
@@ -63,46 +63,6 @@ MaildirResource::MaildirResource(const QByteArray &instanceIdentifier, const QSh | |||
63 | Trace() << "Started maildir resource for maildir: " << mMaildirPath; | 63 | Trace() << "Started maildir resource for maildir: " << mMaildirPath; |
64 | } | 64 | } |
65 | 65 | ||
66 | void MaildirResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | ||
67 | { | ||
68 | Index index("rid.mapping." + bufferType, transaction); | ||
69 | Index localIndex("localid.mapping." + bufferType, transaction); | ||
70 | index.add(remoteId, localId); | ||
71 | localIndex.add(localId, remoteId); | ||
72 | } | ||
73 | |||
74 | void MaildirResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | ||
75 | { | ||
76 | Index index("rid.mapping." + bufferType, transaction); | ||
77 | Index localIndex("localid.mapping." + bufferType, transaction); | ||
78 | index.remove(remoteId, localId); | ||
79 | localIndex.remove(localId, remoteId); | ||
80 | } | ||
81 | |||
82 | QString MaildirResource::resolveRemoteId(const QByteArray &bufferType, const QString &remoteId, Akonadi2::Storage::Transaction &transaction) | ||
83 | { | ||
84 | //Lookup local id for remote id, or insert a new pair otherwise | ||
85 | Index index("rid.mapping." + bufferType, transaction); | ||
86 | Index localIndex("localid.mapping." + bufferType, transaction); | ||
87 | QByteArray akonadiId = index.lookup(remoteId.toUtf8()); | ||
88 | if (akonadiId.isEmpty()) { | ||
89 | akonadiId = QUuid::createUuid().toString().toUtf8(); | ||
90 | index.add(remoteId.toUtf8(), akonadiId); | ||
91 | localIndex.add(akonadiId, remoteId.toUtf8()); | ||
92 | } | ||
93 | return akonadiId; | ||
94 | } | ||
95 | |||
96 | QString MaildirResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Akonadi2::Storage::Transaction &transaction) | ||
97 | { | ||
98 | Index index("localid.mapping." + bufferType, transaction); | ||
99 | QByteArray remoteId = index.lookup(localId); | ||
100 | if (remoteId.isEmpty()) { | ||
101 | Warning() << "Couldn't find the remote id for " << localId; | ||
102 | } | ||
103 | return remoteId; | ||
104 | } | ||
105 | |||
106 | static QStringList listRecursive( const QString &root, const KPIM::Maildir &dir ) | 66 | static QStringList listRecursive( const QString &root, const KPIM::Maildir &dir ) |
107 | { | 67 | { |
108 | QStringList list; | 68 | QStringList list; |
@@ -130,136 +90,28 @@ QStringList MaildirResource::listAvailableFolders() | |||
130 | return folderList; | 90 | return folderList; |
131 | } | 91 | } |
132 | 92 | ||
133 | static void createEntity(const QByteArray &akonadiId, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
134 | { | ||
135 | //These changes are coming from the source | ||
136 | const auto replayToSource = false; | ||
137 | flatbuffers::FlatBufferBuilder entityFbb; | ||
138 | adaptorFactory.createBuffer(domainObject, entityFbb); | ||
139 | flatbuffers::FlatBufferBuilder fbb; | ||
140 | //This is the resource type and not the domain type | ||
141 | auto entityId = fbb.CreateString(akonadiId.toStdString()); | ||
142 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
143 | auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
144 | auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); | ||
145 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); | ||
146 | callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | ||
147 | } | ||
148 | |||
149 | static void modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
150 | { | ||
151 | //These changes are coming from the source | ||
152 | const auto replayToSource = false; | ||
153 | flatbuffers::FlatBufferBuilder entityFbb; | ||
154 | adaptorFactory.createBuffer(domainObject, entityFbb); | ||
155 | flatbuffers::FlatBufferBuilder fbb; | ||
156 | auto entityId = fbb.CreateString(akonadiId.toStdString()); | ||
157 | //This is the resource type and not the domain type | ||
158 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
159 | auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
160 | //TODO removals | ||
161 | auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); | ||
162 | Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); | ||
163 | callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | ||
164 | } | ||
165 | |||
166 | static void deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | ||
167 | { | ||
168 | //These changes are coming from the source | ||
169 | const auto replayToSource = false; | ||
170 | flatbuffers::FlatBufferBuilder fbb; | ||
171 | auto entityId = fbb.CreateString(akonadiId.toStdString()); | ||
172 | //This is the resource type and not the domain type | ||
173 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
174 | auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); | ||
175 | Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); | ||
176 | callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | ||
177 | } | ||
178 | |||
179 | static QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> getLatest(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | ||
180 | { | ||
181 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | ||
182 | db.findLatest(uid, [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
183 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
184 | if (!buffer.isValid()) { | ||
185 | Warning() << "Read invalid buffer from disk"; | ||
186 | } else { | ||
187 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
188 | } | ||
189 | return false; | ||
190 | }, | ||
191 | [](const Akonadi2::Storage::Error &error) { | ||
192 | Warning() << "Failed to read current value from storage: " << error.message; | ||
193 | }); | ||
194 | return current; | ||
195 | } | ||
196 | |||
197 | void MaildirResource::scanForRemovals(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, std::function<bool(const QByteArray &remoteId)> exists) | ||
198 | { | ||
199 | auto mainDatabase = transaction.openDatabase(bufferType + ".main"); | ||
200 | //TODO Instead of iterating over all entries in the database, which can also pick up the same item multiple times, | ||
201 | //we should rather iterate over an index that contains every uid exactly once. The remoteId index would be such an index, | ||
202 | //but we currently fail to iterate over all entries in an index it seems. | ||
203 | // auto remoteIds = synchronizationTransaction.openDatabase("rid.mapping." + bufferType, std::function<void(const Akonadi2::Storage::Error &)>(), true); | ||
204 | mainDatabase.scan("", [this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key, const QByteArray &) { | ||
205 | auto akonadiId = Akonadi2::Storage::uidFromKey(key); | ||
206 | Trace() << "Checking for removal " << key; | ||
207 | const auto remoteId = resolveLocalId(bufferType, akonadiId, synchronizationTransaction); | ||
208 | if (!remoteId.isEmpty()) { | ||
209 | if (!exists(remoteId.toLatin1())) { | ||
210 | Trace() << "Found a removed entity: " << akonadiId; | ||
211 | deleteEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) { | ||
212 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::DeleteEntityCommand, buffer); | ||
213 | }); | ||
214 | } | ||
215 | } | ||
216 | return true; | ||
217 | }, | ||
218 | [](const Akonadi2::Storage::Error &error) { | ||
219 | }); | ||
220 | |||
221 | } | ||
222 | |||
223 | void MaildirResource::createOrModify(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Akonadi2::ApplicationDomain::ApplicationDomainType &entity) | ||
224 | { | ||
225 | auto mainDatabase = transaction.openDatabase(bufferType + ".main"); | ||
226 | const auto akonadiId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction).toLatin1(); | ||
227 | const auto found = mainDatabase.contains(akonadiId); | ||
228 | if (!found) { | ||
229 | Trace() << "Found a new entity: " << remoteId; | ||
230 | createEntity(akonadiId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { | ||
231 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, buffer); | ||
232 | }); | ||
233 | } else { //modification | ||
234 | if (auto current = getLatest(mainDatabase, akonadiId, adaptorFactory)) { | ||
235 | bool changed = false; | ||
236 | for (const auto &property : entity.changedProperties()) { | ||
237 | if (entity.getProperty(property) != current->getProperty(property)) { | ||
238 | Trace() << "Property changed " << akonadiId << property; | ||
239 | changed = true; | ||
240 | } | ||
241 | } | ||
242 | if (changed) { | ||
243 | Trace() << "Found a modified entity: " << remoteId; | ||
244 | modifyEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { | ||
245 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::ModifyEntityCommand, buffer); | ||
246 | }); | ||
247 | } | ||
248 | } else { | ||
249 | Warning() << "Failed to get current entity"; | ||
250 | } | ||
251 | } | ||
252 | } | ||
253 | |||
254 | void MaildirResource::synchronizeFolders(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction) | 93 | void MaildirResource::synchronizeFolders(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction) |
255 | { | 94 | { |
256 | const QByteArray bufferType = ENTITY_TYPE_FOLDER; | 95 | const QByteArray bufferType = ENTITY_TYPE_FOLDER; |
257 | QStringList folderList = listAvailableFolders(); | 96 | QStringList folderList = listAvailableFolders(); |
258 | Trace() << "Found folders " << folderList; | 97 | Trace() << "Found folders " << folderList; |
259 | 98 | ||
260 | scanForRemovals(transaction, synchronizationTransaction, bufferType, [&folderList](const QByteArray &remoteId) -> bool { | 99 | scanForRemovals(transaction, synchronizationTransaction, bufferType, |
261 | return folderList.contains(remoteId); | 100 | [&bufferType, &transaction](const std::function<void(const QByteArray &)> &callback) { |
262 | }); | 101 | //TODO Instead of iterating over all entries in the database, which can also pick up the same item multiple times, |
102 | //we should rather iterate over an index that contains every uid exactly once. The remoteId index would be such an index, | ||
103 | //but we currently fail to iterate over all entries in an index it seems. | ||
104 | // auto remoteIds = synchronizationTransaction.openDatabase("rid.mapping." + bufferType, std::function<void(const Akonadi2::Storage::Error &)>(), true); | ||
105 | auto mainDatabase = transaction.openDatabase(bufferType + ".main"); | ||
106 | mainDatabase.scan("", [&](const QByteArray &key, const QByteArray &) { | ||
107 | callback(key); | ||
108 | return true; | ||
109 | }); | ||
110 | }, | ||
111 | [&folderList](const QByteArray &remoteId) -> bool { | ||
112 | return folderList.contains(remoteId); | ||
113 | } | ||
114 | ); | ||
263 | 115 | ||
264 | for (const auto folderPath : folderList) { | 116 | for (const auto folderPath : folderList) { |
265 | const auto remoteId = folderPath.toUtf8(); | 117 | const auto remoteId = folderPath.toUtf8(); |
@@ -270,7 +122,7 @@ void MaildirResource::synchronizeFolders(Akonadi2::Storage::Transaction &transac | |||
270 | folder.setProperty("name", md.name()); | 122 | folder.setProperty("name", md.name()); |
271 | folder.setProperty("icon", "folder"); | 123 | folder.setProperty("icon", "folder"); |
272 | if (!md.isRoot()) { | 124 | if (!md.isRoot()) { |
273 | folder.setProperty("parent", resolveRemoteId(ENTITY_TYPE_FOLDER, md.parent().path(), synchronizationTransaction).toLatin1()); | 125 | folder.setProperty("parent", resolveRemoteId(ENTITY_TYPE_FOLDER, md.parent().path().toUtf8(), synchronizationTransaction)); |
274 | } | 126 | } |
275 | createOrModify(transaction, synchronizationTransaction, *mFolderAdaptorFactory, bufferType, remoteId, folder); | 127 | createOrModify(transaction, synchronizationTransaction, *mFolderAdaptorFactory, bufferType, remoteId, folder); |
276 | } | 128 | } |
@@ -293,28 +145,23 @@ void MaildirResource::synchronizeMails(Akonadi2::Storage::Transaction &transacti | |||
293 | 145 | ||
294 | QFileInfo entryInfo; | 146 | QFileInfo entryInfo; |
295 | 147 | ||
296 | const auto folderLocalId = resolveRemoteId(ENTITY_TYPE_FOLDER, path, synchronizationTransaction); | 148 | const auto folderLocalId = resolveRemoteId(ENTITY_TYPE_FOLDER, path.toUtf8(), synchronizationTransaction); |
297 | |||
298 | auto exists = [&listingPath](const QByteArray &remoteId) -> bool { | ||
299 | return QFile(listingPath + "/" + remoteId).exists(); | ||
300 | }; | ||
301 | 149 | ||
302 | auto property = "folder"; | 150 | auto property = "folder"; |
303 | Index index(bufferType + ".index." + property, transaction); | 151 | scanForRemovals(transaction, synchronizationTransaction, bufferType, |
304 | index.lookup(folderLocalId.toLatin1(), [&](const QByteArray &akonadiId) { | 152 | [&](const std::function<void(const QByteArray &)> &callback) { |
305 | const auto remoteId = resolveLocalId(bufferType, akonadiId, synchronizationTransaction); | 153 | Index index(bufferType + ".index." + property, transaction); |
306 | if (!remoteId.isEmpty()) { | 154 | index.lookup(folderLocalId, [&](const QByteArray &akonadiId) { |
307 | if (!exists(remoteId.toLatin1())) { | 155 | callback(akonadiId); |
308 | Trace() << "Found a removed entity: " << akonadiId; | 156 | }, |
309 | deleteEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) { | 157 | [&](const Index::Error &error) { |
310 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::DeleteEntityCommand, buffer); | 158 | Warning() << "Error in index: " << error.message << property; |
311 | }); | 159 | }); |
312 | } | 160 | }, |
161 | [&listingPath](const QByteArray &remoteId) -> bool { | ||
162 | return QFile(listingPath + "/" + remoteId).exists(); | ||
313 | } | 163 | } |
314 | }, | 164 | ); |
315 | [property](const Index::Error &error) { | ||
316 | Warning() << "Error in index: " << error.message << property; | ||
317 | }); | ||
318 | 165 | ||
319 | while (entryIterator->hasNext()) { | 166 | while (entryIterator->hasNext()) { |
320 | QString filePath = entryIterator->next(); | 167 | QString filePath = entryIterator->next(); |
@@ -398,7 +245,7 @@ KAsync::Job<void> MaildirResource::replay(const QByteArray &type, const QByteArr | |||
398 | Trace() << "Removing a folder: " << path; | 245 | Trace() << "Removing a folder: " << path; |
399 | KPIM::Maildir maildir(path, false); | 246 | KPIM::Maildir maildir(path, false); |
400 | maildir.remove(); | 247 | maildir.remove(); |
401 | removeRemoteId(ENTITY_TYPE_FOLDER, uid, remoteId.toUtf8(), synchronizationTransaction); | 248 | removeRemoteId(ENTITY_TYPE_FOLDER, uid, remoteId, synchronizationTransaction); |
402 | } else if (operation == Akonadi2::Operation_Modification) { | 249 | } else if (operation == Akonadi2::Operation_Modification) { |
403 | Warning() << "Folder modifications are not implemented"; | 250 | Warning() << "Folder modifications are not implemented"; |
404 | } else { | 251 | } else { |
@@ -419,9 +266,9 @@ KAsync::Job<void> MaildirResource::replay(const QByteArray &type, const QByteArr | |||
419 | auto parentFolder = mail.getProperty("folder").toByteArray(); | 266 | auto parentFolder = mail.getProperty("folder").toByteArray(); |
420 | QByteArray parentFolderRemoteId; | 267 | QByteArray parentFolderRemoteId; |
421 | if (!parentFolder.isEmpty()) { | 268 | if (!parentFolder.isEmpty()) { |
422 | parentFolderRemoteId = resolveLocalId(ENTITY_TYPE_FOLDER, parentFolder, synchronizationTransaction).toLatin1(); | 269 | parentFolderRemoteId = resolveLocalId(ENTITY_TYPE_FOLDER, parentFolder, synchronizationTransaction); |
423 | } else { | 270 | } else { |
424 | parentFolderRemoteId = mMaildirPath.toLatin1(); | 271 | parentFolderRemoteId = mMaildirPath.toUtf8(); |
425 | } | 272 | } |
426 | const auto parentFolderPath = parentFolderRemoteId; | 273 | const auto parentFolderPath = parentFolderRemoteId; |
427 | KPIM::Maildir maildir(parentFolderPath, false); | 274 | KPIM::Maildir maildir(parentFolderPath, false); |
@@ -439,7 +286,7 @@ KAsync::Job<void> MaildirResource::replay(const QByteArray &type, const QByteArr | |||
439 | KPIM::Maildir maildir(parentFolderPath, false); | 286 | KPIM::Maildir maildir(parentFolderPath, false); |
440 | Trace() << "Removing a mail: " << remoteId; | 287 | Trace() << "Removing a mail: " << remoteId; |
441 | maildir.removeEntry(remoteId); | 288 | maildir.removeEntry(remoteId); |
442 | removeRemoteId(ENTITY_TYPE_MAIL, uid, remoteId.toUtf8(), synchronizationTransaction); | 289 | removeRemoteId(ENTITY_TYPE_MAIL, uid, remoteId, synchronizationTransaction); |
443 | } else if (operation == Akonadi2::Operation_Modification) { | 290 | } else if (operation == Akonadi2::Operation_Modification) { |
444 | Warning() << "Mail modifications are not implemented"; | 291 | Warning() << "Mail modifications are not implemented"; |
445 | } else { | 292 | } else { |
diff --git a/examples/maildirresource/maildirresource.h b/examples/maildirresource/maildirresource.h index 9c205c8..48eac67 100644 --- a/examples/maildirresource/maildirresource.h +++ b/examples/maildirresource/maildirresource.h | |||
@@ -40,41 +40,6 @@ public: | |||
40 | private: | 40 | private: |
41 | KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | 41 | KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; |
42 | 42 | ||
43 | /** | ||
44 | * Records a localId to remoteId mapping | ||
45 | */ | ||
46 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction); | ||
47 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction); | ||
48 | |||
49 | /** | ||
50 | * Tries to find a local id for the remote id, and creates a new local id otherwise. | ||
51 | * | ||
52 | * The new local id is recorded in the local to remote id mapping. | ||
53 | */ | ||
54 | QString resolveRemoteId(const QByteArray &type, const QString &remoteId, Akonadi2::Storage::Transaction &transaction); | ||
55 | |||
56 | /** | ||
57 | * Tries to find a remote id for a local id. | ||
58 | * | ||
59 | * This can fail if the entity hasn't been written back to the server yet. | ||
60 | */ | ||
61 | QString resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Akonadi2::Storage::Transaction &transaction); | ||
62 | |||
63 | /** | ||
64 | * An algorithm to remove entities that are no longer existing. | ||
65 | * | ||
66 | * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false, | ||
67 | * an entity delete command is enqueued. | ||
68 | */ | ||
69 | void scanForRemovals(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, std::function<bool(const QByteArray &remoteId)> exists); | ||
70 | |||
71 | /** | ||
72 | * An algorithm to create or modify the entity. | ||
73 | * | ||
74 | * Depending on whether the entity is locally available, or has changed. | ||
75 | */ | ||
76 | void createOrModify(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Akonadi2::ApplicationDomain::ApplicationDomainType &entity); | ||
77 | |||
78 | void synchronizeFolders(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction); | 43 | void synchronizeFolders(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction); |
79 | void synchronizeMails(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, const QString &folder); | 44 | void synchronizeMails(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, const QString &folder); |
80 | QStringList listAvailableFolders(); | 45 | QStringList listAvailableFolders(); |