Parallélisation de traitements batchs

Contexte

Récemment, j’ai participé au développement d’un batch capable d’indexer dans le moteur de recherche Elasticsearch des données provenant d’une base de données tierce. Développé en Java, ce batch s’appuie sur Spring Batch, le plus célèbre framework de traitements par lot de l’écosystème Java
Plus précisément, ce batch est décomposé en 2 jobs Spring Batch, très proches l’un de l’autre :

  1. le premier est capable d’initialiser à partir de zéro le moteur de recherche
  2. et le second traite uniquement les mouvements quotidiens de données.

Problématique

Au cours du traitement batch, l’exécution de la requête par Oracle pour préparer son curseur a été identifiée comme l’opération la plus couteuse, loin devant la lecture des enregistrements en streaming à travers le réseau, leur traitement chargé de construire les documents Lucene à indexer ou leur écriture en mode bulk dans ElasticSearch. A titre d’exemple, sur des volumétries de production, la préparation côté serveur Oracle d’une requête SQL ramenant 10 millions d’enregistrement peut mettre jusqu’à 1h30.

Avec pour objectif que le batch passe sous le seuil de 2h à moindre coût, 2 axes d’optimisations ont été étudiés : diminuer le temps d’exécution par Oracle et diminuer le temps de traitement.

Solutions étudiées

Les optimisations d’un DBA consistant à utiliser des tables temporaires et des procédures stockées n’ont pas été concluantes : trop peu de gains (10 à 20%) pour une réécriture partielle de notre batch, et avec le risque d’engendrer des régressions.

Après mesures et calculs, l’utilisation de la pagination sur des plages de 100, de 1 000 ou même de 10 000 enregistrements a également été écartée. Dans notre contexte, cela aurait dégradé les performances. Le choix de rester sur l’utilisation d’un curseur JDBC a été maintenu.
A cette occasion, nous avons remarqué que les temps de mise en place d’un curseur Oracle pour préparer 1 millions ou 10 millions d’enregistrements étaient du même ordre de grandeur.

Utilisant déjà l’une des techniques proposées par Spring Batch pour paralléliser notre traitement batch, pourquoi ne pas refaire appel à ses loyaux services ?

Spring Batch et ses techniques de parallélisation

Comme indiqué dans son manuel de référence, Spring Batch propose nativement 4 techniques pour paralléliser les traitements :

  1. Multi-threaded Step (single process)
  2. Parallel Steps (single process)
  3. Remote Chunking of Step (multi process)
  4. Partitioning a Step (single or multi process)

Pour optimiser le batch, 2 de ces techniques ont été utilisées.

Le Remote Chunking of Step a été écarté d’office. Dans le contexte client, installer un batch en production est déjà forte affaire. Alors en installer plusieurs interconnectés, je n’ose pas me l’imaginer : à étudier en dernier recours.

Le Multi-threaded Step est sans doute la technique la plus simple à mettre en œuvre. Seule un peu de configuration est suffisante : l’ajout d’un taskExecutor sur le tasklet à paralléliser. La conséquence majeure est que les items peuvent être traités dans le désordre.
Un prérequis à cette technique est que les ItemReader et ItemWiter soient stateless ou thread-safe. La classe de JdbcCursorItemReader de Spring Batch hérite de la classe AbstractItemCountingItemStreamItemReader qui n’est pas thread-safe. L’utilisation d’un wrapper synchronized aurait pu être envisagée si la classe fille de JdbcCursorItemReader développée pour les besoins du batch ne s’appuyait pas elle-même sur un RowMapper avec état reposant sur l’ordre de lecture des éléments.

Les Parallel Steps ont été mises en œuvre dès le début du batch pour traiter en parallèle des données de types différents (ex : Musique et Film). De par leurs jointures, les requêtes SQL de chacun différaient. Avant optimisation, 9 steps étaient déjà exécutés en parallèle par ce biais.

Quatrième et dernière technique, celle du Partitioning a Step est la piste que nous avons étudiée pour diminuer le temps d’exécuter des 3 steps les plus longs. Elle consiste à partitionner les données selon un critère pertinemment choisi (ex : identifiant, année, catégorie), le but étant d’obtenir des partitions de taille équivalente et donc de même temps de traitement.
Bien qu’il ne fut pas parfaitement linéairement réparti, le discriminant retenu pour le batch a été l’identifiant fonctionnel des données à indexer. Les données ont été découpées en 3 partitions. Comme attendu, bien que le volume de données soit divisé par trois, le temps de mise en place du curseur Oracle ne diminua pas. Par contre, le temps de traitements fut divisé par 3, faisant ainsi passer le temps d’exécution du batch de 3h à 2h.
Malgré une augmentation du nombre de requêtes exécutées simultanément, la base Oracle n’a pas montré de faiblesse. Une surcharge aurait en effet pu ternir ce résultat.

Exemple de mise en œuvre

Après ce long discours, rien de tel qu’un peu d’exercice. Pour les besoins de ce billet, et afin de capitaliser sur l’expérience acquise sur la configuration Spring Batch, j’ai mis à jour le projet spring-batch-toolkit hébergé sur GitHub. Le fichier blog-parallelisation.zip contient l’ensemble du code source mavenisé.

Je suis parti d’un cas d’exemple des plus simples : un batch chargé de lire en base de données des chefs-d’œuvre puis de les afficher sur la console.

Modèle physique de données des tables MASTERPIECE, MUSIC_ALBUM et MOVIE

En base, il existe 2 types de chefs-d’œuvre : les films et les albums de musique. Comme le montre le diagramme du modèle physique de données ci-contre, chaque type de chef-d’oeuvre dispose de sa propre table : respectivement MOVIE et MUSIC_ALBUM. Les données communes sont normalisées dans la table MAESTERPIECE.

En ce qui concerne le design du batch, le job peut être décomposé en 2 steps exécutés en parallèle, l’un chargé de traiter les albums de musique, l’autre les films. Une fois les 2 steps terminés, un dernier step affiche le nombre total de chefs-d’œuvre traités.

Avec une volumétrie de film supérieure à celle des albums, le step des films est décomposé en 2 partitions exécutées en parallèle.

Le besoin est simple. Partons d’une démarche TDD et commençons par l’écriture d’un test d’intégration.

Dans un premier temps, attaquons-nous aux données de test, sans doute ce qu’il y’a de plus fastidieux. Exécuté au moment de la création de la base de données embarquée, le script SQL TestParallelAndPartitioning.sql contient les ordres DDL du schéma ci-dessous ainsi que des requêtes INSERT permettant de l’alimenter.

Voici un exemple de données de tests :

Au total, 11 albums et 8 films sont référencés.
La classe de tests TestParallelAndPartitioning repose sur Spring Test, Spring Batch Test et JUnit.

Comme le montre l’extrait de code suivant, la classe JobLauncherTestUtils issue de Spring Batch Test permet d’exécuter notre unique job sans avoir à lui passer de paramètres ainsi que d’attendre la fin de son traitement.

L’exécution du job est suivie d’assertions :

  1. Le job s’est terminé avec succès
  2. Le step des films stepLogMovie a traité les 8 films attendus
  3. Le step des albums de musiques stepLogMusicAlbum  a traité les 11 films attendus
  4. Et en y regardant de plus près, le step des albums a été décomposé en deux « sous-steps », stepLogMusicAlbumPartition:partition0 et stepLogMusicAlbumPartition:partition1 qui correspondent, comme leur nom l’indique, à chacune des 2 partitions. Les 11 films ont été séparés en 2 lots de capacités avoisinantes, à savoir de 6 et 5 films. Avec 3 partitions, on aurait pu s’attendre à un découpage de 4-4-3.

La configuration du batch commence par la déclaration de beans d’infrastructure Spring relativement génériques pour des tests :

  • Une base de données en mémoire H2 initialisée avec le schéma des 6 tables de Spring Batch
  • Le gestionnaire de transactions utilisé par Spring Batch pour gérer ses chunk
  • Le JobRespository dans lequel seront persistés l’historique et le contexte d’exécution des batchs
  • Les beans  SimpleJobLauncher et JobLauncherTestUtils permettant d’exécuter le job testé

Ces beans sont déclarés dans le fichier AbstractSpringBatchTest-context.xml :

La majeure partie de la configuration Spring est définie dans le fichier TestParallelAndPartitioning-context.xml d’où sont tirés les extraits suivants.
En plus du schéma nécessaire par le JobRepository persistant de Spring Batch, les 3 tables de notre exemple sont créées puis alimentées avec notre jeu de données comportant 19 chefs-d’œuvre :

Un pool de threads sera utilisé pour paralléliser le job . Ce pool est dimensionné à 4 threads : un thread pour chacun des 2 parallel steps + un thread pour chacun des 2 « sous-steps » correspondants aux 2 partitions.

 

Vient ensuite la déclaration du job Spring Batch. L’utilisation des balises split et flow permet de mettre en œuvre les Parallel Steps. Couplée avec l’attribut task-executor, l’enchainement des Steps référencés par les flows n’est alors plus linéaire.
Les 2 flows flowMovie et flowMusicAlbum sont exécutés en parallèle. Une fois ces 2 flows terminés, le step stepEnd terminera le job.

Composé d’un seul step (sans partition), la déclaration du flow flowMusicAlbum chargée de logger les films est la plus simple. De type chunk, le step a un reader utilisant un curseur JDBC pour itérer sur la liste des films. La classe BeanPropertyRowMapper permet d’effectuer le mapping entre les colonnes du ResultSet de la requête SQL et le bean java Movie ; il se base sur le nom des colonnes et le nom des propriétés du bean.

Le writer affiche les propriétés du bean Movie à l’aide de la méthode ToStringBuilder.reflectionToString() d’Apache Commons Lang.
L’attribut commit-intervaldu chunk est fixé volontairement à 2. Ainsi, le writer est appelé tous les 2 films. Cela permet de voir plus facilement l’enchevêtrement des différents threads.

Le flow chargé de traiter les films est lui aussi composé d’un seul step : stepLogMusicAlbum. Ce dernier est partitionné en 2 (propriété grid-size= »2″ du handler). Le même pool de threads est utilisé pour traiter les 2 partitions. Le bean chargé de partitionner les données est référencé : partitionerMusicAlbum. Le traitement des « sous-steps » partitionnés est confié au bean stepLogMusicAlbumPartition.

Le bean partitionerMusicAlbum repose sur la classe ColumnRangePartitioner reprise des samples Spring Batch La clé de partition doit lui être précisé sous forme du couple nom de table / nom de colonne.
Techniquement, cette classe utilise ces données pour récupérer les valeurs minimales et maximales de la clé. Pour se faire, 2 requêtes SQL sont exécutées. A partir, du min et du max, connaissant le nombre de partitions à créer (grid-size), elle calcule des intervalles de données de grandeur équivalente. Afin que les partitions soient de taille équivalente en termes de données, les valeurs des clés doivent être uniformément distribuées. C’est par exemple le cas avec un identifiant technique généré par une séquence base de données et pour lesquelles aucune donnée n’est supprimée (pas de trou). Les clés minValue et maxValue de chaque intervalle sont mises à disposition dans le contexte d’exécution de chaque « sous-step ».

De la même manière que son cousin stepLogMovie, le bean stepLogMusicAlbumPartition est composé d’un chunk tasklet. Celui-ci référence 2 beans définis dans la suite du fichier de configuration : readerMusicAlbum et anyObjectWriter, ce dernier étant déjà utilisé par le bean stepLogMovie.

Par rapport à celui en charge de la lecture des films, le bean readerMusicAlbum se démarque en 2 points :

  1. La requête SQL filtre non seulement les chefs-d’œuvre par leur genre (where genre=’Music’), mais également sur une plage d’identifiants (and  b.album_id >= ? and b.album_id <= ?) relatifs à la clé de partitionnement. Cette requête est donc dynamique. Basé sur un PreparedStatement JDBC, elle est exécutée autant de fois qu’il y’a de partitions à traiter.

Les 2 paramètres de la requête (symbolisés par un ?) sont évalués dynamiquement à partir du contexte d’exécution du step. Une Spring Expression Language (SPeL) est utilisée dans la définition du bean anonyme basé sur la classe ListPreparedStatementSetter. Ceci est permis grâce à la portée du bean reader qui est de type step (scope= »step »).

Après épuration des logs et ajout d’un Thread.sleep(50) dans la classe ConsoleItemWriter dans le but, voici le résultat de l’exécution du batch :

Ces traces confirment que le traitement des chefs-d’œuvre est équitablement réparti dans le temps et entre les différents threads, avec une alternance de films et d’albums de musique, et des albums des 2 partitions traités en parallèle.

Conclusion

Pour un effort minime, à peine quelques heures de développement, la durée d’exécution du batch a baissé de 33%, avec un débit avoisinant les 5 000 documents par secondes indexés dans ElasticSearch. Pourquoi donc s’en priver ?

La documentation Spring Batch doit être attentivement suivie pour ne pas tomber dans certains pièges liés à la parallélisassion. La documentation officielle, le livre Spring Batch in Action et maintenant ce billet devraient être des sources suffisantes pour comprendre et mettre en œuvre aux moins 2 des techniques proposées nativement par Spring Batch : Parallel Steps et Partitioning a Step.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *