mercredi 15 janvier 2025

La fonction merge/ The merge function

Il existe, dans la plupart des langages non typés, une méthode qui permet de fusionner deux objets. Cette fusion consiste à reporter tous les champs d'un objet o2 pour les inclure dans l'objet source o1. Dataweave a cette particularité que l'on peut avoir au sein du même objet, des champs qui partagent le même nom. Ainsi:

%dw 2.0
output application/json
var p = {
"message": "Hello monde!",
"message": "Hello world!"
}
---
payload.*message

qui donne comme résultat:

[
"Hello monde!",
"Hello world!"
]

Nous allons voir que cela peut poser problème. Ainsi, prenons le cas d'un objet décrivant une personne avec son adresse et que nous désirons mettre à jour. Premier réflexe : utiliser l'opérateur "++" qui sur le papier, effectue cette fusion:

%dw 2.0
output application/json
var person = {
"firstname": "Harry",
"lastname": "Torkovsky",
"address1": "3 Hotton Street",
"address2": "Somewhere",
"postalcode": "1234"
}
---
person ++ {
"address1": "2 Avenue Hermonsa",
"address2": "Elsewhere",
"postalcode": "4321",
    "phone": "06872354"
}

Le résultat est un peu surprenant:

{
"firstname": "Harry",
"lastname": "Torkovsky",
"address1": "3 Hotton Street",
"address2": "Somewhere",
"postalcode": "1234",
"address1": "2 Avenue Hermonsa",
"address2": "Elsewhere",
"postalcode": "4321",
  "phone": "06872354"
}

Ce n'est clairement pas le comportement souhaité. Nous aurions voulu remplacer les champs concernant l'adresse. Pour cela, la solution la plus évidente est d'utiliser l'opérateur "--" en lui passant la liste des champs à remplacer. Ainsi, notre traitement commence par supprimer les champs partagés avant d'insérer les nouvelles versions:

%dw 2.0
output application/json
var person = {
"firstname": "Harry",
"lastname": "Torkovsky",
"address1": "3 Hotton Street",
"address2": "Somewhere",
"postalcode": "1234"
}
var repl = {
"address1": "2 Avenue Hermonsa",
"address2": "Elsewhere",
"postalcode": "4321",
"phone": "06872354"
}
---
person -- keysOf(repl) ++ repl

Cette fois, le résultat est conforme à nos attentes:

{
"firstname": "Harry",
"lastname": "Torkovsky",
"address1": "2 Avenue Hermonsa",
"address2": "Elsewhere",
"postalcode": "4321",
"phone": "06872354"
}

Problème résolu ? Ben... pas sûr. Si vous avez suivi les posts de ce blog, vous savez que l'opérateur "--" de complexité N(O2), peut poser des problèmes de performance. Donc, que ce passe-t-il si les objets à fusionner sont grands, voire très grands, genre: des champs par milliers ? Pour répondre à cette question, j'ai écrit deux implémentations de la méthode "merge". La première utilise "--", la seconde reconstruit l'objet par filtrage (voir post précédant pour de plus amples explications sur ces deux démarches) Nous allons les comparer:

%dw 2.8

output application/json

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

fun merge0(a, b)=(a -- keysOf(b)) ++ b

fun merge1(a, b)=(

(a pluck {k:$$, v:$}

filter isEmpty(b[$.k])

) ++ (b pluck {k:$$, v:$})

)

reduce (l, a={})->a ++ (l.k):l.v

---

[

eval(()->

merge0(vars.obj, vars.obj)

),

eval(()->

merge1(vars.obj, vars.obj)

)

]


vars.obj est un objet en implémentation JAVA (donc une hashmap) qui contient 25000 champs. "eval" exécute une lambda en loggant le temps qui a été nécessaire pour cela. Voici le résultat (le test a été exécuté quatre fois):

INFO ... DefaultLoggingService$: 30543

INFO ... DefaultLoggingService$: 651

INFO ... DefaultLoggingService$: 29536

INFO ... DefaultLoggingService$: 247

INFO ... DefaultLoggingService$: 28971

INFO ... DefaultLoggingService$: 115

INFO ... DefaultLoggingService$: 30943

INFO ... DefaultLoggingService$: 168


30 secondes pour l'implémentation qui utilise "--" contre une centaine de millisecondes pour l'autre, soir un rapport de 200 (pour 25000 lignes, donc). Cela confirme ce que nous savions. La messe est-elle dite ? Eh bien... c'est à nuancer. Car sur de petits objets, l'opérateur "--" et plus efficace ! Ainsi, nous allons utiliser nos deux implémentations sur de petits objets, mais de très nombreuses fois:

%dw 2.8

output application/json

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

fun merge0(a, b)=(a -- keysOf(b)) ++ b

fun merge1(a, b)=(

(a pluck {k:$$, v:$}

filter isEmpty(b[$.k])

) ++ (b pluck {k:$$, v:$})

)

reduce (l, a={})->a ++ (l.k):l.v

---

[

eval(()->

(0 to 10000) map merge0(vars.obj, vars.obj)

),

eval(()->

(0 to 10000) map merge1(vars.obj, vars.obj)

)

]


Les résultats montrent, grosso modo que l'opérateur "--" est cette fois le plus efficace. Il l'est d'un facteur 1,5 à 2. C'est moins important que pour les gros objets, mais ce peut être non négligeable si on parcourt des listes immenses à merger :

INFO ... DefaultLoggingService$: 1018

INFO ... DefaultLoggingService$: 1898

INFO ... DefaultLoggingService$: 444

INFO ... DefaultLoggingService$: 701

INFO ... DefaultLoggingService$: 194

INFO ... DefaultLoggingService$: 395

INFO ... DefaultLoggingService$: 218

INFO ... DefaultLoggingService$: 339


Noter que Dataweave propose une méthode "mergeWith" disponible à partir de la version 2.8 de DataWeave. L'existence de cette méthode pour l'instant me semble réduite ... à sa documentation (?). Je ne suis pas arrivé à la faire accepter par Mulesoft (même dans la version 4.8.0 du serveur).
_________________________________________________________________________

Most non-typed languages have a method for merging two objects. This merge consists in carrying over all the fields of an object o2 and including them in the source object o1. Dataweave's special feature is that, within the same object, you can have fields that share the same name. For example:

%dw 2.0
output application/json
var p = {
"message": "Hello monde!",
"message": "Hello world!"
}
---
payload.*message

which results in:

[
"Hello monde!",
"Hello world!"
]

We'll see how this can cause problems. Let's take the case of an object describing a person and their address, which we want to update. Our first instinct is to use the “++” operator, which, on paper, performs this merge:

%dw 2.0
output application/json
var person = {
"firstname": "Harry",
"lastname": "Torkovsky",
"address1": "3 Hotton Street",
"address2": "Somewhere",
"postalcode": "1234"
}
---
person ++ {
"address1": "2 Avenue Hermonsa",
"address2": "Elsewhere",
"postalcode": "4321",
    "phone": "06872354"
}

The result is a little surprising:

{
"firstname": "Harry",
"lastname": "Torkovsky",
"address1": "3 Hotton Street",
"address2": "Somewhere",
"postalcode": "1234",
"address1": "2 Avenue Hermonsa",
"address2": "Elsewhere",
"postalcode": "4321",
  "phone": "06872354"
}

This is clearly not the desired behavior.

We would have liked to replace the address fields.To do this, the most obvious solution is to use the “--” operator, passing it the list of fields to be replaced.In this way, our processing begins by deleting the shared fields before inserting the new versions:

%dw 2.0
output application/json
var person = {
"firstname": "Harry",
"lastname": "Torkovsky",
"address1": "3 Hotton Street",
"address2": "Somewhere",
"postalcode": "1234"
}
var repl = {
"address1": "2 Avenue Hermonsa",
"address2": "Elsewhere",
"postalcode": "4321",
"phone": "06872354"
}
---
person -- keysOf(repl) ++ repl

This time, the result is as expected:

{
"firstname": "Harry",
"lastname": "Torkovsky",
"address1": "2 Avenue Hermonsa",
"address2": "Elsewhere",
"postalcode": "4321",
"phone": "06872354"
}

Problem solved? Well... not sure. If you've been following the posts on this blog, you'll know that the “--” operator of complexity N(O2) can cause performance problems. So, what happens if the objects to be merged are large, or even very large, like thousands of fields? To answer this question, I've written two implementations of the “merge” method. The first uses “--”, the second reconstructs the object by filtering (see previous post for further explanation of these two approaches). Let's compare them:

%dw 2.8

output application/json

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

fun merge0(a, b)=(a -- keysOf(b)) ++ b

fun merge1(a, b)=(

(a pluck {k:$$, v:$}

filter isEmpty(b[$.k])

) ++ (b pluck {k:$$, v:$})

)

reduce (l, a={})->a ++ (l.k):l.v

---

[

eval(()->

merge0(vars.obj, vars.obj)

),

eval(()->

merge1(vars.obj, vars.obj)

)

]


vars.obj is a JAVA object (i.e. a hashmap) containing 25,000 fields.

“eval” executes a lambda, logging the time it took to do so.
Here's the result (the test was run four times):

INFO ... DefaultLoggingService$: 30543

INFO ... DefaultLoggingService$: 651

INFO ... DefaultLoggingService$: 29536

INFO ... DefaultLoggingService$: 247

INFO ... DefaultLoggingService$: 28971

INFO ... DefaultLoggingService$: 115

INFO ... DefaultLoggingService$: 30943

INFO ... DefaultLoggingService$: 168


30 seconds for the implementation that uses “--” versus a hundred milliseconds for the other, a ratio of 200 (for 25,000 lines). This confirms what we already knew. Is it all over? Well... not quite. For small objects, the “--” operator is more efficient! So we're going to use our two implementations on small objects, but lots and lots of times:

%dw 2.8

output application/json

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

fun merge0(a, b)=(a -- keysOf(b)) ++ b

fun merge1(a, b)=(

(a pluck {k:$$, v:$}

filter isEmpty(b[$.k])

) ++ (b pluck {k:$$, v:$})

)

reduce (l, a={})->a ++ (l.k):l.v

---

[

eval(()->

(0 to 10000) map merge0(vars.obj, vars.obj)

),

eval(()->

(0 to 10000) map merge1(vars.obj, vars.obj)

)

]


The results show, roughly speaking, that the “--” operator is the most efficient this time.

It is by a factor of 1.5 to 2. This is less important than for large objects, but can be significant if you're browsing huge merger lists:

INFO ... DefaultLoggingService$: 1018

INFO ... DefaultLoggingService$: 1898

INFO ... DefaultLoggingService$: 444

INFO ... DefaultLoggingService$: 701

INFO ... DefaultLoggingService$: 194

INFO ... DefaultLoggingService$: 395

INFO ... DefaultLoggingService$: 218

INFO ... DefaultLoggingService$: 339


Please note that Dataweave offers a “mergeWith” method available from DataWeave version 2.8 onwards. The existence of this method for the time being seems limited to its documentation (?). I haven't managed to get Mulesoft to accept it (even in server version 4.8.0).


vendredi 10 janvier 2025

Synchronisation d'une liste géante partie 1: découpage / Synchronizing a giant list part 1: cutting

 Fini de rire. Cette fois, nous allons voir ce que nous pouvons faire pour synchroniser des listes qui contiennent des dizaines, voire des centaines de milliers de lignes. D'ailleurs, dans notre exemple la liste aura un million de lignes ! Nous partirons des hypothèses suivantes:

  • Les listes ne peuvent pas être chargées en mémoire en entier,
  • Les listes ne sont pas triées (dans notre exemple, ce sera le cas, mais nous n'utiliserons pas cette caractéristique, à dessein.

Chaque objet est muni d'un identifiant immutable et d'un contenu qui lui, peut varier. Comme d'habitude, nous chercherons à obtenir les identifiants:

  • des objets créées (les objets figurants dans la seconde liste et pas dans la première)
  • des objets supprimés (les objets figurants dans la seconde liste et pas dans la première)
  • des objets modifiés (les objets figurants dans les deux listes, mais dont la partie contenu a été modifiée)

Regardons les options à notre disposition pour comparer les deux listes. Je n'en vois que deux:

  • parcourir simultanément les deux listes après les avoir triées selon leurs identifiants. En suivant la progression des identifiants, il est possible de sérier les objets selon leur présence et l'évolution de leur contenu.
  • découper les deux listes en morceaux de tailles similaires suffisamment réduites pour pouvoir être chargées en entier et traitées. Il faut de plus que les frontières de découpe soient strictement les mêmes pour les deux listes.

A priori, la première option est la plus intéressante et pourtant, c'est la seconde que l'on va lui préférer. La première présente deux contraintes rédhibitoires, alors que les contraintes de la secondes, à priori très lourdes peuvent être résolues à l'aide d'une astuce mathématique des plus utiles.

La première option présente deux problèmes majeurs:

  1. d'abord il faut trier les listes. Il existe bien une méthode pour cela (sort) mais elle implique de charger toute la liste, ce qui est contraire aux hypothèses de départ.
  2. ensuite, il faut parcourir simultanément les deux listes, sans les charger dans leur totalité. Il existe bien un mécanisme pour cela: le streaming. Certes. Mais le streaming ne peut s'appliquer qu'à une liste, pas aux deux simultanément !

Bon, si on écarte la première option, comment résoudre les contraintes de la seconde ? L'idée est de découper chaque liste en appliquant un modulo sur leur identifiant. On prend pour cela des paquets de taille modérée (10k dans notre exemple), et on les ventile en fonction en un nombre déterminé à l'avance (20 dans notre exemple) groupes. On ajoute ses groupes aux groupes des paquets précédents, stockés dans des enregistrements Object Store. Cette manière de faire présente de nombreux avantages:

  • on ne traite qu'une liste à la fois, lors de la découpe: le streaming est donc applicable. L'identification des évolutions n'a lieu que dans un second temps, par paquet suffisamment réduit pour pouvoir être chargés entièrement
  • nul besoin de trier quoi que ce soit
  • il est facile de maîtriser les  coupes et de les aligner sur les deux listes
  • l'implémentation est plus simple.
Certes, mais certains objecteront - avec raison - que tout cela ne marche que si deux suppositions se révèlent exactes:
  • les identifiants sont numériques
  • la distribution des nombres que contiennent ces champs sont à peu près uniformes (afin de s'assurer que les paquets générés soient de tailles similaires)
Même si dans beaucoup de cas, on peut imaginer que ces deux caractères puissent être vérifiés, il existe un moyen - la fameuse "astuce mathématique" dont il est question plus haut - pour le garantir dans tous les cas: il suffit d'appliquer le modulo non pas directement sur l'identifiant, mais sur un checksum généré à partir de cet identifiant, en utilisant  un algorithme qui en assure l'uniformité, (comme MD5) et le tour est joué !

D'ailleurs MD5 va nous être utile pour résoudre un autre problème. Rappelez vous, nous avons dit que nous allions stocker les objets dans des enregistrements Object Store. Très bien. Mais la taille de ces enregistrements est limitée. Que faire si l'on doit parcourir des listes d'objets de grandes tailles. Pire: comment gérer le cas ou cette taille peut beaucoup, beaucoup varier? Et puis, comment allons nous faire pour comparer le contenu de deux objets de même identifiant ? champ par champ, récursivement ? Et bien, à toutes ces questions, nous avons un unique et géniale réponde: MD5. Plutôt que de stocker les objets en entier avec tous les problèmes potentiels vus plus haut, nous allons juste stocker leur identifiant et le checksum de leur contenu:
  • la taille de chaque enregistrement est complètement maîtrisé
  • les comparaisons sonr ridiculement simples: il suffit de comparer deux chaînes de caractères!
Je tiens ici à (re)rassurer ceux qui s'inquiètent d'un téléscopage de la valeur d'un checksum: le risque s'enfonce dans l'infiniment petit, dans des proportions cosmiques (on parle de milliards de milliards de milliard d'années avant que çà n'arrive !).

Notre flux s'implémente de la façon suivante:

Noter que ce flux utilise deux boucles imbriquées:
  1. La première découpe le flux en entrée, par paquets de 10k lignes
  2. Ces 10k lignes sont regroupées en fonction du modulo du MD5 de leurs identifiants respectifs (transformer ProcessList)
  3. La seconde boucle itère sur les regroupements pour les ajouter aux enregistrements Object Stores idoines.
 L'essentiel de la magie est dans le transformer "Process List":

%dw 2.0

import dw::Crypto

import * from dw::core::Numbers

output application/json

var nbr = vars.attributes.nbr as Number

---

((payload

map (i)->{

id: i.id,

key: (fromHex(Crypto::MD5(write(i.id) as Binary)) mod nbr) as String,

value: Crypto::MD5(write(i) as Binary)

})

groupBy (i)->i.key)

pluck $

map (l)-> l reduce(r, a={key:l[0].key, lines:{}})->{

key:r.key,

lines:(a.lines ++ (r.id): r.value)

}


La partie conservation dans Object Store est très simple. La seule chose à noter est que la clé de l'enregistrement à charger puis à sauvegarder est définie dynamiquement à partir de la valeur résultante de modulo traitée:




Le transformeur "Concatenate to relevant block" ajoute les nouvelles lignes à ce qui à déjà été sauvegardé lors des itérations précédentes:

%dw 2.0

output application/java

---

vars.bloc ++ payload.lines

La sauvegarde de la nouvelle version de l'enregistrement ne présente elle aussi, aucune difficulté:



Bon, l'heure de vérité  sonné: lançons une exécution en munissant notre process de quelques logs, nous pouvons surveiller son déroulement et le temps nécessaire pour l'exécuter:



Sur mon portable personnel, tout ce qui a de plus banal aujourd'hui, le traitement du million de lignes (100 x 10k) prend donc 387 secondes, soit entre 6 et 7 minutes. Bien sûr, cela peut varier en fonction du temps nécessaire pour obtenir les checksums qui varie avec la taille des objets à traiter. Mais les ordres de grandeur sont là. Nous verrons dans le prochain post que l'essentiel du temps nécessaire pour l'ensemble du traitement est pris par ce découpage. Une telle méthode permet donc de détecter des évolutions sur de très grandes listes en quelques minutes !

Autre point intéressant est la taille des différents enregistrements. Nous pouvons constater que par rapport à la moyenne (50k objets par enregistrement), l'écart est au pire, ici de 1% (store 18) ! Sachant qu'un enregistrement Object Store est capable de stocker autour de 100k lignes réduites à id et un checksum, on est donc très, très loin de la zone de danger. Disposer d'une fonction qui distribue uniformément les valeurs des id prend bien toute sa pertinence.

Des esprits malins remarqueront qu'une partie de ce processus est en O(N2). En effet, au fur et à mesure que nous complétons les enregistrements Object Store, nous sommes obligés de de recharger des listes de plus en plus volumineuses (linéairement). En modifiant les logs afin d'afficher le temps pris par chaque étape, on obtient le résultat suivant:


En effet, petit à petit, le temps de traitement s'allonge, ce qui montre bien le caractère O(N2) du processus. Pour des dizaines ou des centaines de milliers de lignes, l'impact est faible. Il devient beaucoup plus sérieux dès qu'on dépasse le million de lignes. J'ai fait un essai avec 10 millions de lignes, que j'ai arrêté au milieu du traitement (autour de 5 millions de lignes ventilées sur 200 enregistrements). Voici les chiffres constatés:

Là, les choses deviennent rapidement prohibitives, sachant que là, il y a 1000 itérations de 10k lignes ! Cette fois, on parle d'heures ! D'autres mécanismes, encore plus sophistiqués doivent être mis en place. Pour un tel volume de données, cela se justifie pleinement.

_________________________________________________________________________

 No more kidding. This time, we're going to see what we can do to synchronize lists containing tens or even hundreds of thousands of lines. In fact, in our example, the list will have a million lines! We'll start from the following assumptions:

  • Lists cannot be loaded into memory in their entirety,
  • Lists are not sorted (in our example, this will be the case, but we won't use this feature, on purpose).

Each object has an immutable identifier and a content that can vary. As usual, we'll be looking for the identifiers:

  • objects created (objects appearing in the second list and not in the first)
  • objects deleted (objects appearing in the second list and not in the first)
  • modified objects (objects appearing in both lists, but whose content has been modified)

Let's look at the options available to us to compare the two lists.I can think of only two:

  • browse both lists simultaneously, after sorting them according to their identifiers.By following the progression of identifiers, it's possible to sort objects according to their presence and the evolution of their content.
  • cut the two lists into pieces of similar size, small enough to be loaded in their entirety and processed. In addition, the cutting boundaries must be strictly the same for both lists.

On the face of it, the first option is the most interesting, yet it's the second that we're going to prefer.The first has two prohibitive constraints, while the second's seemingly onerous constraints can be solved with a useful mathematical trick.

The first option presents two major problems:

  1. firstly, the lists need to be sorted. There is a method for this (sort), but it implies loading the entire list, which is contrary to the initial assumptions.
  2. secondly, the two lists need to be browsed simultaneously, without being loaded in their entirety. There is a mechanism for this: streaming.Yes, there is. But streaming can only be applied to one list, not both simultaneously!

So, if we reject the first option, how do we solve the constraints of the second? The idea is to split each list by applying a modulo to its identifier. To do this, we take moderate-sized packets (10k in our example), and divide them up into a predetermined number of groups (20 in our example). These groups are then added to the groups of previous packages, stored in Object Store records. This approach offers a number of advantages:

  • only one list is processed at a time, during cutting: streaming is therefore applicable. The identification of evolutions only takes place at a later stage, in packages small enough to be loaded in their entirety
  • no need to sort anything
  • it's easy to control cuts and align them with the two lists
  • implementation is simpler.
Certainly, but some will object - and rightly so - that all this only works if two assumptions prove true:
  • identifiers are numeric
  • the distribution of the numbers contained in these fields is roughly uniform (to ensure that the packets generated are of similar size)
Even if, in many cases, it is conceivable that these two characters could be verified, there is a way - the famous “mathematical trick” mentioned above - to guarantee this in all cases: simply apply the modulo not directly to the identifier, but to a checksum generated from this identifier, using an algorithm that ensures uniformity (such as MD5), and you're done!

MD5 will also come in handy for solving another problem. Remember we said we were going to store objects in Object Store records? Well, that's fine. But the size of these records is limited. What do we do if we have to browse lists of large objects? Worse still, how do we deal with the case where this size can vary greatly? And then, how are we going to compare the contents of two objects with the same identifier? field by field, recursively? Well, we have a unique and ingenious answer to all these questions: MD5. Rather than storing the objects in their entirety, with all the potential problems seen above, we'll just store their identifier and the checksum of their contents:
  • the size of each record is completely controlled
  • comparisons are ridiculously simple: just compare two strings!
I'd like to (re)reassure those who are worried about a checksum value telescoping: the risk is sinking into the infinitely small, in cosmic proportions (we're talking billions of billions of billions of years before that happens!).

Our flow is implemented as follows:

Note that this flow uses two nested loops:

  1. The first slices the input stream into packets of 10k lines.
  2. These 10k lines are grouped according to the MD5 modulo of their respective identifiers (transform ProcessList)
  3. The second loop iterates over the groupings to add them to the appropriate Object Stores records.

 Most of the magic is in the “Process List” transform:

%dw 2.0

import dw::Crypto

import * from dw::core::Numbers

output application/json

var nbr = vars.attributes.nbr as Number

---

((payload

map (i)->{

id: i.id,

key: (fromHex(Crypto::MD5(write(i.id) as Binary)) mod nbr) as String,

value: Crypto::MD5(write(i) as Binary)

})

groupBy (i)->i.key)

pluck $

map (l)-> l reduce(r, a={key:l[0].key, lines:{}})->{

key:r.key,

lines:(a.lines ++ (r.id): r.value)

}


The storage part of Object Store is very simple. The only thing to note is that the key of the record to be loaded and then saved is defined dynamically from the resulting modulo value processed:



The “Concatenate to relevant block” transformer adds the new lines to what has already been saved in previous iterations:

%dw 2.0

output application/java

---

vars.bloc ++ payload.lines

Saving the new version of the recording is also straightforward:



Well, the moment of truth has arrived: let's launch an execution by providing our process with a few logs, so we can monitor its progress and the time it takes to execute it:


On my personal laptop, the most common of today's laptops, processing a million lines (100 x 10k) therefore takes 387 seconds, i.e. between 6 and 7 minutes. Of course, this may vary according to the time needed to obtain checksums, which varies with the size of the objects to be processed. But the orders of magnitude are there. We'll see in the next post that the bulk of the time required for the whole process is taken up by this breakdown. Such a method can therefore detect evolutions on very large lists in a matter of minutes!

Another interesting point is the size of the individual records. We can see that, compared with the average (50k objects per record), the deviation is at worst 1% (blind 18)! Given that an Object Store record is capable of storing around 100k lines reduced to id and checksum, we're very, very far from the danger zone. So it makes sense to have a function that evenly distributes id values.

Clever minds will notice that part of this process is O(N2). In fact, as we complete the Object Store records, we are obliged to reload ever larger lists (linearly). By modifying the logs to display the time taken by each step, we obtain the following result:

In fact, little by little, processing time increases, which clearly demonstrates the O(N2) nature of the process. For tens or hundreds of thousands of lines, the impact is slight. It becomes much more serious as soon as the number of lines exceeds one million. I did a test with 10 million lines, which I stopped in the middle of processing (around 5 million lines broken down into 200 records). Here are the figures I found:

This is where things quickly become prohibitive, bearing in mind that there are 1000 iterations of 10k lines! This time, we're talking hours! Other, even more sophisticated mechanisms need to be put in place. For such a volume of data, this is fully justified.

mercredi 8 janvier 2025

Question de performances autour du traitement d'une liste/Performance issues around list processing

Question de performances, épisode 2. Nous avions vu dans un précédant post combien le choix de l'implémentation des objets manipulés (Java plutôt que Json). Nous allons voir ici que le choix de l'algorithme est tout aussi essentiel.

Vous allez me dire: évidemment qu'il faut choisir le "bon" principe algorithmique et nous en tenir à ceux dont la complexité est au maximum d'ordre O(N)log(N). Les choses ne sont pas si simple, car le langage utilisé par Mulesoft, Dataweave, est un langage fonctionnel, dont le fonctionnement présente quelques particularités difficilement anticipables.

Nous allons tester trois implémentations d'une fonctionnalité à priori simple: supprimer des champs d'un objet (ce qui est aussi supprimer les lignes d'une Hashmap ou ce qui en fait office dans Dataweave). La comparaison en terme de performance est éloquente.

La première implémenattion utilise l'opérateur "--". Elle supprime une partie des champs d'un objet. Le nom de ces champs figure dans une liste placée à droite de l'opérateur "--" (variable keys ici):

a -- (keys)

Le code a été coupé en deux parties (deux "Transform" dans le flot Mulesoft) afin de s'assurer que les objets manipulés soient effectivement implémentés sous la forme de HashMap Java, la version la plus performante d'un objet Dataweave. C'est la raison d'être du premier composant de transformation:

%dw 2.0

output application/java

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

---

eval(()->{

o1:(0 to 10000) reduce(v, a={})->a ++ (v as String):v,

k:keysOf((0 to 10000) reduce(v, a={})->

                    if ((v mod 4)==0) a else a ++ (v as String):v)

}

)


"o1" est la hashmap à laquelle on retirera un quart de ses lignes. Les clés sont stockées dans "k". Nous allons donc retirer 2500 lignes d'une collection qui en contient 10000. Le code pour le faire (second transformeur) à l'avantage d'être d'une extrême simplicité:


%dw 2.0

output application/java

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

---

eval(()->

payload.o1 -- payload.k

)


La méthode "eval" calcule le temps nécessaire pour exécuter un code (nombre donné en millisecondes). Que nous indiquent ces méthodes "eval" ? Eh bien çà:


INFO ... DefaultLoggingService$: 35

INFO ... DefaultLoggingService$: 3917


Quatre secondes, c'est considérable. Que ce passe-t-il si nous doublons la taille de l'essai ? Eh bien ça:


INFO ... DefaultLoggingService$: 35

INFO ... DefaultLoggingService$: 12587


Le premier transformeur s'exécute tellement rapidement que les chiffres affichés ne sont pas significatifs (il peut être très fortement impacté par le garbage collector par exemple).


Le problème s'aggrave considérablement, c'est à dire très au delà d'un rapport 2. Clairement, le fonctionnement sous-jacent de l'opérateur "--" est polynomial (O(N2+))).


Tentons une autre approche. Au lieu de supprimer tous les champs d'un coup (opérateur "--"), nous allons les supprimer un par un (operateur "-" ou key est le nom d'un champ):


a - key


Le premier transformeur est strictement le même. Toutes les différences dont dans la seconde snippet:


%dw 2.0

output application/json

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

---

eval(()->

payload.k reduce (k, a=payload.o1)-> a - k

)


Pour retirer 2500 lignes parmi 10000, nous obtenons les résultats suivants:


INFO ... DefaultLoggingService$: 47

INFO ... DefaultLoggingService$: 3376


A priori, le résultat est à peu près le même. Doublons la taille de l'essai (5000/20000) pour nous en assurer:


INFO ... DefaultLoggingService$: 37

INFO ... DefaultLoggingService$: 11849


Nous confirmons, les nombres sont du même ordre de grandeur. Le problème vient bien du fait qu'on cherche à retirer un champ d'une hashmap (j'ai bien vérifié o1 est bien une HashMap Java). Or retirer une ligne d'une telle structure est de complexité O(1). Difficile d'expliquer la quadratique...


Essayons quelque chose de très différent. Plutôt que retirer des lignes d'une hashmap, nous allons en fait en recréer une autre en filtrant les lignes qui n'appartiennent pas à une autre hashmap. Cette fois, il faut modifier les deux transformeurs. Le premier génère une liste de 10000 objets que l'on pourra filter (on ne peut filtrer un objet Dataweave/Hashmap Java) et l'objet Dataweave/Hashmap Java qui contient les champs à supprimer.


%dw 2.0

output application/java

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

---

eval(()->{

l1:((0 to 10000) reduce(v, a={})->

                    a ++ (v as String):v) pluck (v, k)-> { id:k, value:v },

o2:(0 to 10000) reduce(v, a={})->

                    if ((v mod 4)==0) a else a ++ (v as String):v

}

)


Le second transformeur filtre la liste et construit la hashmap avec les champs acceptés:


%dw 2.0

output application/java

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

---

eval(()->

payload.l1 reduce (r, a={})->

        if (isEmpty(payload.o2[r.id])) a ++ (r.id):r.value else a

)


L'exécution est beaucoup, beaucoup plus rapide:


INFO ... DefaultLoggingService$: 48

INFO ... DefaultLoggingService$: 77


Doublons l'essai pour mesurer l'impact que peut avoir la taille de l'essai:


INFO ... DefaultLoggingService$: 37

INFO ... DefaultLoggingService$: 72

On reste dans l'épaisseur du trait, l'impact n'est même pas perceptible. 

La meilleure solution étant évidente, tentons de trouver des raisons pour une telle différence. Je ne peux ici qu'avancer des hypothèses, en partant de ce que j'ai appris d'autres technologies utilisant une approche fonctionnelle. Avançons que la raison tient au principe d'immutabilité: une valeur ne peut pas être modifiée, la seule action autorisée et d'en recréer une autre qui reflète la modification demandée. Donc retirer une ligne d'une hashmap implique en fait d'en recréer une autre, plus courte d'une ligne. L'opération n'est donc pas anecdotique si cette hashmap est importante. Sachant cela, la complexité en O(N2) s'explique très bien.

Mais me diriez vous, suppression et ajout sont toutes deux des modifications et nous devrions avoir un comportement de même type en recréant l'objet. Or ce n'est pas le cas. Pourquoi ?

Evidemment, si l'immutabilité - louée de partout comme un puissant pattern de robustesse - s'accompagnait systématiquement d'un tel inconvénient, l'approche fonctionnel, dont elle est un élément essentiel, serait resté un sujet d'étude confiné au monde universitaire. Heureusement, il existe des "trucs" qui permettent de contourner les obstacles. Essentiellement, la plupart des langages fonctionnels implémentent les listes de manière récursives: L2 qui est L1 auquel on ajoute un élément E nouveau, peut être définie de la manière suivante: 




Il n'est donc plus nécessaire de recopier L1 ! (qui peut être référencée ailleurs sans danger puisqu'elle est immutable). Ajouter un élément à une liste n'est donc pas un algorithme en O(N) et donc construire la ligne n'est pas un algorithme en O(N2); mais juste O(N).

OK. Mais la, ce qui est généré est une hashmap, pas une liste. Eh bien, pas sûr. En fait la transformation de l'objet construit par notre snippet en une hashmap est probablement l'étape finale pour se conformer à la directive output application/java. Il est fort probable qu'en interne, un objet Dataweave en cours de construction soit aussi implémenté sous la forme d'une liste. Cela expliquerait d'ailleurs que la recherche d'un champ dans un objet Dataweave soit d'ordre O(N) et non d'ordre O(1) comme c'est le cas d'une hashmap Java.

Que déduire de tout ça: eh bien que décidément, Mulesoft nécessite une compréhension profonde de ses mécanismes pour pouvoir être utilisé efficacement. Dès qu'il faut traiter des objets ou listes de grande taille, on a toutes les chances de tomber dans un piège ou un autre et voir les performances s'effondrer, si on ne possède pas de base solides en algorithmie et en programmation fonctionnelle. Les erreurs ou approximation ne pardonnent pas. Pour le prouver, le prochain post concernera un flot capable de détecter des changements dans une liste d'un million de lignes. Spoiler: avec la bonne implémentation, on parle de secondes pour un tel traitement, au pire de minutes, pas d'heures !

_________________________________________________________________________

Performance issues, episode 2. In a previous post, we saw how important it is to choose the implementation of the manipulated objects (Java rather than Json). We'll see here that the choice of algorithm is just as essential.

You'll tell me: of course we have to choose the “right” algorithmic principle and stick to those whose complexity is of order O(N)log(N) at most. Things aren't quite so simple, however, because the language used by Mulesoft, Dataweave, is a functional language, with a few peculiarities that are difficult to anticipate.

We're going to test three implementations of an apparently straightforward functionality: deleting fields from an object (which is also deleting rows from a Hashmap, or what Dataweave calls a Hashmap). The comparison in terms of performance speaks for itself.

The first implementation uses the “--” operator. It deletes part of an object's fields. The name of these fields appears in a list to the right of the “--” operator operator (variable keys here):

a -- (keys)

The code has been split into two parts (two “Transforms” in the Mulesoft flow) to ensure that the objects manipulated are actually implemented in the form of a Java HashMap, the most powerful version of a Dataweave object. This is the purpose of the first transformation component:

%dw 2.0

output application/java

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

---

eval(()->{

o1:(0 to 10000) reduce(v, a={})->a ++ (v as String):v,

k:keysOf((0 to 10000) reduce(v, a={})->

                    if ((v mod 4)==0) a else a ++ (v as String):v)

}

)


“o1” is the hashmap from which we will remove a quarter of its lines. The keys are stored in “k”. So we're going to remove 2,500 lines from a collection containing 10,000. The code to do this (second transformer) has the advantage of being extremely simple:


%dw 2.0

output application/java

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

---

eval(()->

payload.o1 -- payload.k

)


The “eval” method calculates the time needed to execute a code (given in milliseconds). What do these “eval” methods tell us? Well, this:


INFO ... DefaultLoggingService$: 35

INFO ... DefaultLoggingService$: 3917


Four seconds is a long time. What happens if we double the trial size? Well, this:


INFO ... DefaultLoggingService$: 35

INFO ... DefaultLoggingService$: 12587


The first transformer runs so fast that the numbers displayed are not significant (it may be heavily impacted by garbage collection, for example).


The problem worsens considerably, i.e. far beyond a ratio of 2. Clearly, the underlying operation of the “--” operator is polynomial (O(N2+))).


Let's try another approach. Instead of deleting all the fields at once (“--” operator), we'll delete them one by one (“-” operator, where key is the name of a field:


a - key


The first transformer is strictly the same. All the differences are in the second snippet:


%dw 2.0

output application/json

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

---

eval(()->

payload.k reduce (k, a=payload.o1)-> a - k

)


To remove 2500 lines from 10000, we obtain the following results:


INFO ... DefaultLoggingService$: 47

INFO ... DefaultLoggingService$: 3376


A priori, the result is more or less the same. Let's double the test size (5000/20000) to be sure:


INFO ... DefaultLoggingService$: 37

INFO ... DefaultLoggingService$: 11849


We confirm that the numbers are of the same order of magnitude. The problem comes from the fact that we're trying to remove a field from a hashmap (I've checked that o1 is a Java HashMap). Removing a row from such a structure is O(1) complex. Hard to explain the quadratic...


Let's try something very different. Rather than removing rows from one hashmap, we'll actually recreate another by filtering out rows that don't belong to another hashmap. This time, we need to modify both transformers. The first generates a list of 10,000 objects that can be filtered (a Dataweave/Hashmap Java object cannot be filtered) and the Dataweave/Hashmap Java object containing the fields to be removed.


%dw 2.0

output application/java

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

---

eval(()->{

l1:((0 to 10000) reduce(v, a={})->

                    a ++ (v as String):v) pluck (v, k)-> { id:k, value:v },

o2:(0 to 10000) reduce(v, a={})->

                    if ((v mod 4)==0) a else a ++ (v as String):v

}

)


The second transformer filters the list and builds the hashmap with the accepted fields:


%dw 2.0

output application/java

fun Now() = now() then (t)->(t as Number)*1000 + t.milliseconds

fun eval(prs) =

Now() then (t1)-> prs() then (r)->

log(Now() -t1)

then r

---

eval(()->

payload.l1 reduce (r, a={})->

        if (isEmpty(payload.o2[r.id])) a ++ (r.id):r.value else a

)


Execution is much, much faster:


INFO ... DefaultLoggingService$: 48

INFO ... DefaultLoggingService$: 77


Let's duplicate the test to measure the impact of test size:


INFO ... DefaultLoggingService$: 37

INFO ... DefaultLoggingService$: 72

The impact is not even perceptible. 

The best solution being obvious, let's try to find reasons for such a difference. I can only speculate here, based on what I've learned from other technologies using a functional approach. Let's put forward that the reason lies in the principle of immutability: a value cannot be modified, the only action allowed is to recreate another one that reflects the requested modification. So removing a line from a hashmap actually means recreating another one, one line shorter. The operation is therefore not trivial if the hashmap is large. With this in mind, the complexity in O(N2) is easy to explain.

But you might say, deleting and adding are both modifications, and we should have the same type of behavior when recreating the object. But this is not the case. Why isn't this the case?

Obviously, if immutability - praised everywhere as a powerful robustness pattern - were systematically accompanied by such a drawback, the functional approach, of which it is an essential element, would remain a subject of study confined to the academic world. Fortunately, there are a number of “tricks” that can be used to overcome these obstacles. Essentially, most functional languages implement lists recursively: L2, which is L1 to which we add a new element E, can be defined as follows: 



So there's no need to copy L1! (which can safely be referenced elsewhere, since it is immutable). So adding an element to a list is not an O(N) algorithm, and building the line is not an O(N2) algorithm; it's just O(N).

OK. But then, what's generated is a hashmap, not a list. Well, not sure. In fact, transforming the object built by our snippet into a hashmap is probably the final step in complying with the output application/java directive. It's highly likely that a Dataweave object under construction is also implemented as a list. This would explain why the search for a field in a Dataweave object is of order O(N) and not of order O(1), as is the case with a Java hashmap.

What can we deduce from all this? Well, Mulesoft definitely requires a deep understanding of its mechanisms to be used effectively. As soon as you have to deal with large objects or lists, there's every chance of falling into one trap or another and seeing performance plummet, if you don't have a solid grounding in algorithmic and functional programming. Errors and approximations are unforgiving. To prove this, the next post will concern a stream capable of detecting changes in a list of a million lines. Spoiler: with the right implementation, we're talking seconds for such processing, at worst minutes, not hours.


Pourquoi ce blog ? / Why this blog?

Mulesoft est un ESB du monde Salesforce utilisé pour construire des flots permettant aux pièces logicielles d'un Système d'Informati...