Fini de rire. Cette fois, nous allons voir ce que nous pouvons faire pour synchroniser des listes qui contiennent des dizaines, voire des centaines de milliers de lignes. D'ailleurs, dans notre exemple la liste aura un million de lignes ! Nous partirons des hypothèses suivantes:
- Les listes ne peuvent pas être chargées en mémoire en entier,
- Les listes ne sont pas triées (dans notre exemple, ce sera le cas, mais nous n'utiliserons pas cette caractéristique, à dessein.
Chaque objet est muni d'un identifiant immutable et d'un contenu qui lui, peut varier. Comme d'habitude, nous chercherons à obtenir les identifiants:
- des objets créées (les objets figurants dans la seconde liste et pas dans la première)
- des objets supprimés (les objets figurants dans la seconde liste et pas dans la première)
- des objets modifiés (les objets figurants dans les deux listes, mais dont la partie contenu a été modifiée)
Regardons les options à notre disposition pour comparer les deux listes. Je n'en vois que deux:
- parcourir simultanément les deux listes après les avoir triées selon leurs identifiants. En suivant la progression des identifiants, il est possible de sérier les objets selon leur présence et l'évolution de leur contenu.
- découper les deux listes en morceaux de tailles similaires suffisamment réduites pour pouvoir être chargées en entier et traitées. Il faut de plus que les frontières de découpe soient strictement les mêmes pour les deux listes.
A priori, la première option est la plus intéressante et pourtant, c'est la seconde que l'on va lui préférer. La première présente deux contraintes rédhibitoires, alors que les contraintes de la secondes, à priori très lourdes peuvent être résolues à l'aide d'une astuce mathématique des plus utiles.
La première option présente deux problèmes majeurs:
- d'abord il faut trier les listes. Il existe bien une méthode pour cela (sort) mais elle implique de charger toute la liste, ce qui est contraire aux hypothèses de départ.
- ensuite, il faut parcourir simultanément les deux listes, sans les charger dans leur totalité. Il existe bien un mécanisme pour cela: le streaming. Certes. Mais le streaming ne peut s'appliquer qu'à une liste, pas aux deux simultanément !
Bon, si on écarte la première option, comment résoudre les contraintes de la seconde ? L'idée est de découper chaque liste en appliquant un modulo sur leur identifiant. On prend pour cela des paquets de taille modérée (10k dans notre exemple), et on les ventile en fonction en un nombre déterminé à l'avance (20 dans notre exemple) groupes. On ajoute ses groupes aux groupes des paquets précédents, stockés dans des enregistrements Object Store. Cette manière de faire présente de nombreux avantages:
- on ne traite qu'une liste à la fois, lors de la découpe: le streaming est donc applicable. L'identification des évolutions n'a lieu que dans un second temps, par paquet suffisamment réduit pour pouvoir être chargés entièrement
- nul besoin de trier quoi que ce soit
- il est facile de maîtriser les coupes et de les aligner sur les deux listes
- l'implémentation est plus simple.
Certes, mais certains objecteront - avec raison - que tout cela ne marche que si deux suppositions se révèlent exactes:
- les identifiants sont numériques
- la distribution des nombres que contiennent ces champs sont à peu près uniformes (afin de s'assurer que les paquets générés soient de tailles similaires)
Même si dans beaucoup de cas, on peut imaginer que ces deux caractères puissent être vérifiés, il existe un moyen - la fameuse "astuce mathématique" dont il est question plus haut - pour le garantir dans tous les cas: il suffit d'appliquer le modulo non pas directement sur l'identifiant, mais sur un checksum généré à partir de cet identifiant, en utilisant un algorithme qui en assure l'uniformité, (comme MD5) et le tour est joué !
D'ailleurs MD5 va nous être utile pour résoudre un autre problème. Rappelez vous, nous avons dit que nous allions stocker les objets dans des enregistrements Object Store. Très bien. Mais la taille de ces enregistrements est limitée. Que faire si l'on doit parcourir des listes d'objets de grandes tailles. Pire: comment gérer le cas ou cette taille peut beaucoup, beaucoup varier? Et puis, comment allons nous faire pour comparer le contenu de deux objets de même identifiant ? champ par champ, récursivement ? Et bien, à toutes ces questions, nous avons un unique et géniale réponde: MD5. Plutôt que de stocker les objets en entier avec tous les problèmes potentiels vus plus haut, nous allons juste stocker leur identifiant et le checksum de leur contenu:
- la taille de chaque enregistrement est complètement maîtrisé
- les comparaisons sonr ridiculement simples: il suffit de comparer deux chaînes de caractères!
Je tiens ici à (re)rassurer ceux qui s'inquiètent d'un téléscopage de la valeur d'un checksum: le risque s'enfonce dans l'infiniment petit, dans des proportions cosmiques (on parle de milliards de milliards de milliard d'années avant que çà n'arrive !).
Notre flux s'implémente de la façon suivante:
Noter que ce flux utilise deux boucles imbriquées:
- La première découpe le flux en entrée, par paquets de 10k lignes
- Ces 10k lignes sont regroupées en fonction du modulo du MD5 de leurs identifiants respectifs (transformer ProcessList)
- La seconde boucle itère sur les regroupements pour les ajouter aux enregistrements Object Stores idoines.
L'essentiel de la magie est dans le transformer "Process List":
%dw 2.0
import dw::Crypto
import * from dw::core::Numbers
output application/json
var nbr = vars.attributes.nbr as Number
---
((payload
map (i)->{
id: i.id,
key: (fromHex(Crypto::MD5(write(i.id) as Binary)) mod nbr) as String,
value: Crypto::MD5(write(i) as Binary)
})
groupBy (i)->i.key)
pluck $
map (l)-> l reduce(r, a={key:l[0].key, lines:{}})->{
key:r.key,
lines:(a.lines ++ (r.id): r.value)
}
La partie conservation dans Object Store est très simple. La seule chose à noter est que la clé de l'enregistrement à charger puis à sauvegarder est définie dynamiquement à partir de la valeur résultante de modulo traitée:
Le transformeur "Concatenate to relevant block" ajoute les nouvelles lignes à ce qui à déjà été sauvegardé lors des itérations précédentes:
%dw 2.0
output application/java
---
vars.bloc ++ payload.lines
La sauvegarde de la nouvelle version de l'enregistrement ne présente elle aussi, aucune difficulté:
Bon, l'heure de vérité sonné: lançons une exécution en munissant notre process de quelques logs, nous pouvons surveiller son déroulement et le temps nécessaire pour l'exécuter:
Sur mon portable personnel, tout ce qui a de plus banal aujourd'hui, le traitement du million de lignes (100 x 10k) prend donc 387 secondes, soit entre 6 et 7 minutes. Bien sûr, cela peut varier en fonction du temps nécessaire pour obtenir les checksums qui varie avec la taille des objets à traiter. Mais les ordres de grandeur sont là. Nous verrons dans le prochain post que l'essentiel du temps nécessaire pour l'ensemble du traitement est pris par ce découpage. Une telle méthode permet donc de détecter des évolutions sur de très grandes listes en quelques minutes !
Autre point intéressant est la taille des différents enregistrements. Nous pouvons constater que par rapport à la moyenne (50k objets par enregistrement), l'écart est au pire, ici de 1% (store 18) ! Sachant qu'un enregistrement Object Store est capable de stocker autour de 100k lignes réduites à id et un checksum, on est donc très, très loin de la zone de danger. Disposer d'une fonction qui distribue uniformément les valeurs des id prend bien toute sa pertinence.
Des esprits malins remarqueront qu'une partie de ce processus est en O(N2). En effet, au fur et à mesure que nous complétons les enregistrements Object Store, nous sommes obligés de de recharger des listes de plus en plus volumineuses (linéairement). En modifiant les logs afin d'afficher le temps pris par chaque étape, on obtient le résultat suivant:
En effet, petit à petit, le temps de traitement s'allonge, ce qui montre bien le caractère O(N2) du processus. Pour des dizaines ou des centaines de milliers de lignes, l'impact est faible. Il devient beaucoup plus sérieux dès qu'on dépasse le million de lignes. J'ai fait un essai avec 10 millions de lignes, que j'ai arrêté au milieu du traitement (autour de 5 millions de lignes ventilées sur 200 enregistrements). Voici les chiffres constatés:
Là, les choses deviennent rapidement prohibitives, sachant que là, il y a 1000 itérations de 10k lignes ! Cette fois, on parle d'heures ! D'autres mécanismes, encore plus sophistiqués doivent être mis en place. Pour un tel volume de données, cela se justifie pleinement.
_________________________________________________________________________
No more kidding. This time, we're going to see what we can do to synchronize lists containing tens or even hundreds of thousands of lines. In fact, in our example, the list will have a million lines! We'll start from the following assumptions:
- Lists cannot be loaded into memory in their entirety,
- Lists are not sorted (in our example, this will be the case, but we won't use this feature, on purpose).
Each object has an immutable identifier and a content that can vary. As usual, we'll be looking for the identifiers:
- objects created (objects appearing in the second list and not in the first)
- objects deleted (objects appearing in the second list and not in the first)
- modified objects (objects appearing in both lists, but whose content has been modified)
Let's look at the options available to us to compare the two lists.I can think of only two:
- browse both lists simultaneously, after sorting them according to their identifiers.By following the progression of identifiers, it's possible to sort objects according to their presence and the evolution of their content.
- cut the two lists into pieces of similar size, small enough to be loaded in their entirety and processed. In addition, the cutting boundaries must be strictly the same for both lists.
On the face of it, the first option is the most interesting, yet it's the second that we're going to prefer.The first has two prohibitive constraints, while the second's seemingly onerous constraints can be solved with a useful mathematical trick.
The first option presents two major problems:
- firstly, the lists need to be sorted. There is a method for this (sort), but it implies loading the entire list, which is contrary to the initial assumptions.
- secondly, the two lists need to be browsed simultaneously, without being loaded in their entirety. There is a mechanism for this: streaming.Yes, there is. But streaming can only be applied to one list, not both simultaneously!
So, if we reject the first option, how do we solve the constraints of the second? The idea is to split each list by applying a modulo to its identifier. To do this, we take moderate-sized packets (10k in our example), and divide them up into a predetermined number of groups (20 in our example). These groups are then added to the groups of previous packages, stored in Object Store records. This approach offers a number of advantages:
- only one list is processed at a time, during cutting: streaming is therefore applicable. The identification of evolutions only takes place at a later stage, in packages small enough to be loaded in their entirety
- no need to sort anything
- it's easy to control cuts and align them with the two lists
- implementation is simpler.
Certainly, but some will object - and rightly so - that all this only works if two assumptions prove true:
- identifiers are numeric
- the distribution of the numbers contained in these fields is roughly uniform (to ensure that the packets generated are of similar size)
Even if, in many cases, it is conceivable that these two characters could be verified, there is a way - the famous “mathematical trick” mentioned above - to guarantee this in all cases: simply apply the modulo not directly to the identifier, but to a checksum generated from this identifier, using an algorithm that ensures uniformity (such as MD5), and you're done!
MD5 will also come in handy for solving another problem. Remember we said we were going to store objects in Object Store records? Well, that's fine. But the size of these records is limited. What do we do if we have to browse lists of large objects? Worse still, how do we deal with the case where this size can vary greatly? And then, how are we going to compare the contents of two objects with the same identifier? field by field, recursively? Well, we have a unique and ingenious answer to all these questions: MD5. Rather than storing the objects in their entirety, with all the potential problems seen above, we'll just store their identifier and the checksum of their contents:
- the size of each record is completely controlled
- comparisons are ridiculously simple: just compare two strings!
I'd like to (re)reassure those who are worried about a checksum value telescoping: the risk is sinking into the infinitely small, in cosmic proportions (we're talking billions of billions of billions of years before that happens!).
Our flow is implemented as follows:
Note that this flow uses two nested loops:
- The first slices the input stream into packets of 10k lines.
- These 10k lines are grouped according to the MD5 modulo of their respective identifiers (transform ProcessList)
- The second loop iterates over the groupings to add them to the appropriate Object Stores records.
Most of the magic is in the “Process List” transform:
%dw 2.0
import dw::Crypto
import * from dw::core::Numbers
output application/json
var nbr = vars.attributes.nbr as Number
---
((payload
map (i)->{
id: i.id,
key: (fromHex(Crypto::MD5(write(i.id) as Binary)) mod nbr) as String,
value: Crypto::MD5(write(i) as Binary)
})
groupBy (i)->i.key)
pluck $
map (l)-> l reduce(r, a={key:l[0].key, lines:{}})->{
key:r.key,
lines:(a.lines ++ (r.id): r.value)
}
The storage part of Object Store is very simple. The only thing to note is that the key of the record to be loaded and then saved is defined dynamically from the resulting modulo value processed:
The “Concatenate to relevant block” transformer adds the new lines to what has already been saved in previous iterations:
%dw 2.0
output application/java
---
vars.bloc ++ payload.lines
Saving the new version of the recording is also straightforward:
Well, the moment of truth has arrived: let's launch an execution by providing our process with a few logs, so we can monitor its progress and the time it takes to execute it:
On my personal laptop, the most common of today's laptops, processing a million lines (100 x 10k) therefore takes 387 seconds, i.e. between 6 and 7 minutes. Of course, this may vary according to the time needed to obtain checksums, which varies with the size of the objects to be processed. But the orders of magnitude are there. We'll see in the next post that the bulk of the time required for the whole process is taken up by this breakdown. Such a method can therefore detect evolutions on very large lists in a matter of minutes!
Another interesting point is the size of the individual records. We can see that, compared with the average (50k objects per record), the deviation is at worst 1% (blind 18)! Given that an Object Store record is capable of storing around 100k lines reduced to id and checksum, we're very, very far from the danger zone. So it makes sense to have a function that evenly distributes id values.
Clever minds will notice that part of this process is O(N2). In fact, as we complete the Object Store records, we are obliged to reload ever larger lists (linearly). By modifying the logs to display the time taken by each step, we obtain the following result:
In fact, little by little, processing time increases, which clearly demonstrates the O(N2) nature of the process. For tens or hundreds of thousands of lines, the impact is slight. It becomes much more serious as soon as the number of lines exceeds one million. I did a test with 10 million lines, which I stopped in the middle of processing (around 5 million lines broken down into 200 records). Here are the figures I found:
This is where things quickly become prohibitive, bearing in mind that there are 1000 iterations of 10k lines! This time, we're talking hours! Other, even more sophisticated mechanisms need to be put in place. For such a volume of data, this is fully justified.