Fini de rire. Cette fois, nous allons voir ce que nous pouvons faire pour synchroniser des listes qui contiennent des dizaines, voire des centaines de milliers de lignes. D'ailleurs, dans notre exemple la liste aura un million de lignes ! Nous partirons des hypothèses suivantes:
- Les listes ne peuvent pas être chargées en mémoire en entier,
- Les listes ne sont pas triées (dans notre exemple, ce sera le cas, mais nous n'utiliserons pas cette caractéristique, à dessein.
Chaque objet est muni d'un identifiant immutable et d'un contenu qui lui, peut varier. Comme d'habitude, nous chercherons à obtenir les identifiants:
- des objets créées (les objets figurants dans la seconde liste et pas dans la première)
- des objets supprimés (les objets figurants dans la seconde liste et pas dans la première)
- des objets modifiés (les objets figurants dans les deux listes, mais dont la partie contenu a été modifiée)
Regardons les options à notre disposition pour comparer les deux listes. Je n'en vois que deux:
- parcourir simultanément les deux listes après les avoir triées selon leurs identifiants. En suivant la progression des identifiants, il est possible de sérier les objets selon leur présence et l'évolution de leur contenu.
- découper les deux listes en morceaux de tailles similaires suffisamment réduites pour pouvoir être chargées en entier et traitées. Il faut de plus que les frontières de découpe soient strictement les mêmes pour les deux listes.
A priori, la première option est la plus intéressante et pourtant, c'est la seconde que l'on va lui préférer. La première présente deux contraintes rédhibitoires, alors que les contraintes de la secondes, à priori très lourdes peuvent être résolues à l'aide d'une astuce mathématique des plus utiles.
La première option présente deux problèmes majeurs:
- d'abord il faut trier les listes. Il existe bien une méthode pour cela (sort) mais elle implique de charger toute la liste, ce qui est contraire aux hypothèses de départ.
- ensuite, il faut parcourir simultanément les deux listes, sans les charger dans leur totalité. Il existe bien un mécanisme pour cela: le streaming. Certes. Mais le streaming ne peut s'appliquer qu'à une liste, pas aux deux simultanément !
Bon, si on écarte la première option, comment résoudre les contraintes de la seconde ? L'idée est de découper chaque liste en appliquant un modulo sur leur identifiant. On prend pour cela des paquets de taille modérée (10k dans notre exemple), et on les ventile en fonction en un nombre déterminé à l'avance (20 dans notre exemple) groupes. On ajoute ses groupes aux groupes des paquets précédents, stockés dans des enregistrements Object Store. Cette manière de faire présente de nombreux avantages:
- on ne traite qu'une liste à la fois, lors de la découpe: le streaming est donc applicable. L'identification des évolutions n'a lieu que dans un second temps, par paquet suffisamment réduit pour pouvoir être chargés entièrement
- nul besoin de trier quoi que ce soit
- il est facile de maîtriser les coupes et de les aligner sur les deux listes
- l'implémentation est plus simple.
- les identifiants sont numériques
- la distribution des nombres que contiennent ces champs sont à peu près uniformes (afin de s'assurer que les paquets générés soient de tailles similaires)
- la taille de chaque enregistrement est complètement maîtrisé
- les comparaisons sonr ridiculement simples: il suffit de comparer deux chaînes de caractères!
- La première découpe le flux en entrée, par paquets de 10k lignes
- Ces 10k lignes sont regroupées en fonction du modulo du MD5 de leurs identifiants respectifs (transformer ProcessList)
- La seconde boucle itère sur les regroupements pour les ajouter aux enregistrements Object Stores idoines.
%dw 2.0
import dw::Crypto
import * from dw::core::Numbers
output application/json
var nbr = vars.attributes.nbr as Number
---
((payload
map (i)->{
id: i.id,
key: (fromHex(Crypto::MD5(write(i.id) as Binary)) mod nbr) as String,
value: Crypto::MD5(write(i) as Binary)
})
groupBy (i)->i.key)
pluck $
map (l)-> l reduce(r, a={key:l[0].key, lines:{}})->{
key:r.key,
lines:(a.lines ++ (r.id): r.value)
}
%dw 2.0
output application/java
---
vars.bloc ++ payload.lines
_________________________________________________________________________
No more kidding. This time, we're going to see what we can do to synchronize lists containing tens or even hundreds of thousands of lines. In fact, in our example, the list will have a million lines! We'll start from the following assumptions:
- Lists cannot be loaded into memory in their entirety,
- Lists are not sorted (in our example, this will be the case, but we won't use this feature, on purpose).
Each object has an immutable identifier and a content that can vary. As usual, we'll be looking for the identifiers:
- objects created (objects appearing in the second list and not in the first)
- objects deleted (objects appearing in the second list and not in the first)
- modified objects (objects appearing in both lists, but whose content has been modified)
Let's look at the options available to us to compare the two lists.I can think of only two:
- browse both lists simultaneously, after sorting them according to their identifiers.By following the progression of identifiers, it's possible to sort objects according to their presence and the evolution of their content.
- cut the two lists into pieces of similar size, small enough to be loaded in their entirety and processed. In addition, the cutting boundaries must be strictly the same for both lists.
On the face of it, the first option is the most interesting, yet it's the second that we're going to prefer.The first has two prohibitive constraints, while the second's seemingly onerous constraints can be solved with a useful mathematical trick.
The first option presents two major problems:
- firstly, the lists need to be sorted. There is a method for this (sort), but it implies loading the entire list, which is contrary to the initial assumptions.
- secondly, the two lists need to be browsed simultaneously, without being loaded in their entirety. There is a mechanism for this: streaming.Yes, there is. But streaming can only be applied to one list, not both simultaneously!
So, if we reject the first option, how do we solve the constraints of the second? The idea is to split each list by applying a modulo to its identifier. To do this, we take moderate-sized packets (10k in our example), and divide them up into a predetermined number of groups (20 in our example). These groups are then added to the groups of previous packages, stored in Object Store records. This approach offers a number of advantages:
- only one list is processed at a time, during cutting: streaming is therefore applicable. The identification of evolutions only takes place at a later stage, in packages small enough to be loaded in their entirety
- no need to sort anything
- it's easy to control cuts and align them with the two lists
- implementation is simpler.
- identifiers are numeric
- the distribution of the numbers contained in these fields is roughly uniform (to ensure that the packets generated are of similar size)
- the size of each record is completely controlled
- comparisons are ridiculously simple: just compare two strings!
Note that this flow uses two nested loops:
- The first slices the input stream into packets of 10k lines.
- These 10k lines are grouped according to the MD5 modulo of their respective identifiers (transform ProcessList)
- The second loop iterates over the groupings to add them to the appropriate Object Stores records.
Most of the magic is in the “Process List” transform:
%dw 2.0
import dw::Crypto
import * from dw::core::Numbers
output application/json
var nbr = vars.attributes.nbr as Number
---
((payload
map (i)->{
id: i.id,
key: (fromHex(Crypto::MD5(write(i.id) as Binary)) mod nbr) as String,
value: Crypto::MD5(write(i) as Binary)
})
groupBy (i)->i.key)
pluck $
map (l)-> l reduce(r, a={key:l[0].key, lines:{}})->{
key:r.key,
lines:(a.lines ++ (r.id): r.value)
}
%dw 2.0
output application/java
---
vars.bloc ++ payload.lines
On my personal laptop, the most common of today's laptops, processing a million lines (100 x 10k) therefore takes 387 seconds, i.e. between 6 and 7 minutes. Of course, this may vary according to the time needed to obtain checksums, which varies with the size of the objects to be processed. But the orders of magnitude are there. We'll see in the next post that the bulk of the time required for the whole process is taken up by this breakdown. Such a method can therefore detect evolutions on very large lists in a matter of minutes!
Another interesting point is the size of the individual records. We can see that, compared with the average (50k objects per record), the deviation is at worst 1% (blind 18)! Given that an Object Store record is capable of storing around 100k lines reduced to id and checksum, we're very, very far from the danger zone. So it makes sense to have a function that evenly distributes id values.
Clever minds will notice that part of this process is O(N2). In fact, as we complete the Object Store records, we are obliged to reload ever larger lists (linearly). By modifying the logs to display the time taken by each step, we obtain the following result:
In fact, little by little, processing time increases, which clearly demonstrates the O(N2) nature of the process. For tens or hundreds of thousands of lines, the impact is slight. It becomes much more serious as soon as the number of lines exceeds one million. I did a test with 10 million lines, which I stopped in the middle of processing (around 5 million lines broken down into 200 records). Here are the figures I found:
This is where things quickly become prohibitive, bearing in mind that there are 1000 iterations of 10k lines! This time, we're talking hours! Other, even more sophisticated mechanisms need to be put in place. For such a volume of data, this is fully justified.
Aucun commentaire:
Enregistrer un commentaire