# Pratiques et préconisations pour les développeurs autour d'un broker de message jms. Le présent document a pour but de donner des éléments d'informations et des préconisations autour de l'utilisation d'un broker JMS. Ce document n'est pas exhaustif, il vient compléter l'offre mise en place, il y a quelques années par la DAAP dont on peut retrouver le [site documentaire de la DAAP](http://architecture-applicative.gitlab-pages.insee.fr/documentation/services_applicatifs/middleware/activemq/). Plan: 1. Notions générales 1.1 Au fait, c'est quoi un broker de message ? 1.2 Au fait, c'est quoi Jms? 1.3 Aller un peu plus loin dans les concepts: 2. L'offre historique 2.1 les 3 cas d'usage 2.2 Focus sur requete/reponse 3. Une utilisation du broker orienté évènement et asynchrone: 3.1 Quoi de mieux qu'un exemple 3.2 Les pièges de l'asynchronisme 4. Préconisations autour de springboot: 4.1 Les éléments de configuration springboot 4.2 Un exemple dans le cas particulier de requete/reponse. 5. Un mot sur event sourcing et Cqrs. 5.1 Cqrs 5.2 Event sourcing ## 1. Notions générales : ### 1.1 Au fait, c'est quoi un broker de message ? C'est une solution MOM (Message Oriented Middleware) qui permet une communication asynchrone entre deux ou plusieurs applicatifs. A quoi ca peut servir? 1. Connecter des systèmes qui ne seraient pas compatibles directement. 2. Répondre plus rapidement en déléguant des fonctions qui ne seraient pas prioritaires. 3. Envoyer des notifications à un ou plusieurs clients. **Pourquoi m'embeter avec un broker et ne pas utiliser des apis via HTTP simplement ?** Cette question est pertinente et le broker ne doit pas devenir un intermediaire obligatoire, au contraire. Il y a des bénéfices à n'utiliser que des échanges inter-applicatifs REST: * simple et connu * testable facilement via browser, swagger. * firewall friendly * simplifie l'architecture plutot que de mettre un broker au milieu il y a des inconvénients aussi: * Ne supporte que des interactions requetes/reponses en temps contraint. * Potentiellement réduit la disponiblité, il faut que le client et le service soit tout deux disponibles au moment de l'échange. * Les clients doivent connaitres les url des services ce qui peut amener à mettre en place une mécanique de service discovery. * Fetcher plusieurs ressources en un appel peut devenir une source de problème. C'est pourquoi, il n'est pas rare pour des produits/systèmes d'être ouvert sur l'extérieur en input/output via des apis et en interne avoir ses modules applicatifs reliés par un broker par exemple pour lisser l'activité ou améliorer la disponibilité en fonction des performances attendues. ![](https://i.imgur.com/gVJmSEh.png) ## 1.2. Au fait, c'est quoi jms? Java Message Service est une specification permettant à une application java d'interagir avec des MOM, à l'image de jdbc ou jpa pour les bases de données. Elle définit par exemple des entités comme: * un provider : souvent le framework du broker * un client Jms. * La structure d'un message (payload, header...). * La fabrique de connexion, queue (mode point à point) et topic (mode publish/subscribe). Un provider jms doit implémenter les spécifications et, comme pour le sql, charge au développeur d'utiliser les fonctionnalités JMS du broker et pas une fonctionnalité hors du scope de JMS que proposerait le broker sauf à être conscient de cette adhérence. ## 1.3. Aller un peu plus loin dans les concepts: ### 1.3.1 La notion de queue : ![](https://i.imgur.com/fVD8OW1.png) * Un module dit producteur contient une ou plusieurs instances d'objet producteur en charge d'envoyer des messages dans une queue. * Un module dit consommateur contient une ou plusieurs instances d'objet consommateur en charge de lire les messages d'une queue. Il est important de faire la distinction entre module applicatif et une instance d'un objet producteur ou consommateur. Par exemple : * Un batch monothread aura un consommateur, il n'y aura vraiment qu'un seul consommateur sur la queue visible sur le broker. * Un batch multithreadé peut avoir autant de consommateur que de thread, le batch est unique mais le consommateur non. Par défaut, la queue est de type first-in first out c'est à dire que la queue recoit des messages et les **délivre** dans l'ordre : * Si on a un consommateur uniquement les messages sont lus dans l'ordre de réception. * Si on a plusieurs consommateurs les messages sont délivrés dans l'ordre mais pas forcément traités dans l'ordre cela dépend du temps de traitement du message par exemple. Quand il y a plusieurs consommateurs il rentre en compétition vis à vis de la queue. Evidemment, un module applicatif consommateur ne lit pas forcément qu'une queue, un consommateur lui n'en lit qu'une. PLusieurs paramètres structurants sont de la responsabilité de l'applicatif: * **le nombre de consommateur** par exemple 1 consommateur garantie l'ordre des messages. * le **temps moyen de traitement d'un message**(performance de l'application) on verra par la suite les impacts que ca a. * la manière dont le consommateur prefetch les messages dans la queue : la **prefetch policy**, un objet consommateur peut prefetcher un ensemble de message par lot vis à vis du broker. * la manière dont le broker et le consommateur valident la bonne consommation du message via le **mode d'acknowledgment**: * Défini par le client il en existe 3 : AUTO, DUPS_OK, CLIENT. * de la **QoS sur le message**, on peut positionner sur le message des paramètres de QoS parmi : * une **priorité** (qui joue donc sur l'ordre first-in first-out) * un **timeToLive** (en fonction de la performance/disponiblité des consommateurs le message peut ne jamais etre traité, avec un ttl il peut expirer et aller par défaut dans la DLQ). * un mode de **persistence** (si le broker le permet c'est le cas à l'Insee, le message peut être persisté ou pas sur le broker, nécessaire selon votre use case pour la garantie de transmission du message en cas de failover ou redémarrage). ### 1.3.2 La notion de topic : ![](https://i.imgur.com/18WtUxM.png) Contrairement à une queue qui s'assure qu'un message est délivré en point à point à un consommateur, le topic permet de publier un message et de s'assurer qu'il est délivré à n consommateurs. ![](https://minio.stable.innovation.insee.eu/hackmd-uploads/uploads/upload_d83299dca17b7af113c10ba157193c34.png) il faut différencier les consommateurs de topic durable et non durables. * durables alors si un consommateur souscrit à un topic et est absent il retrouve les messages envoyés pendant son absence. * Avantage : un fil d'abonnement complet * Inconvénient : en cas de longue absence le topic se rempli jusqu'a une limite à maitriser. * La QoS s'appliquant sur le message, le producteur peut mettre un ttl pour éviter de stocker des messages jusqu'a explosion du broker, sinon on peut paramétrer sur le topic un moyen de supprimer les abonnés durables inactifs depuis trop longtemps. http://activemq.apache.org/manage-durable-subscribers.html * Il est possible d'unsubscribe via le code applicatif ( si les programmes abonnés sont arrétés). * non durables alors ca s'apparente à un broadcast réseau, seuls les consommateurs actuellement connectés recoivent le message. ## 1.3.3 Variantes autour des topics et queues : En plus de ces 2 notions structurantes, on peut faire porter par le broker des comportements pour des raisons fonctionnelles ou de performances. En voici quelques uns, même si l'on privilégiera, quand c'est possible, de ne pas mettre de logique dans le broker. ### 1.3.3.1 Gestion de l'ordre des messages : Une application peut avoir besoin de traiter les messages dans l'ordre. La solution la plus simple, si possible, est alors de ne pas multithreader ses programmes et de passer sur un consommateur exclusif. http://activemq.apache.org/exclusive-consumer Cela n'est pas toujours possible en terme de performance de n'avoir qu'un consommateur. Dans ce cas, on peut se tourner vers le partitionnement de la consommation de message. ### 1.3.3.2 Partitionnement de la consommation des messages : Il est possible en ajoutant un attribut JMSXGroupID que le broker envoit toujours sur le meme consommateur les messages portant le meme JMSXGroupID afin de load balancer des partitions de message. https://activemq.apache.org/message-groups Cela a des avantages et des inconvénients mais peut etre utile si le fonctionnel s'y prete. Notamment en terme de cache par exemple car on sait qu'on va pouvoir profiter d'un cache applicatif. Fonctionnalités a priori lié à JMS2 et non activeMq ``` JMSXGroupID and JMSXGroupSeq are standard properties clients should use if they want to group messages. All providers must support them. ``` ### 1.3.3.2 Selector : D'une manière générale, un selector est une manière pour un consommateur de ne traiter que les messages vérifiants les critères du selector sur les metadata d'un message pour une queue/topic donné. C'est donc aussi une manière d'orienter/regrouper les messages. Par contre, le sélecteur n'est pas dynamiquement changeable. Il doit respecter la syntaxe SQL92. Un usage classique est d'ajouter un header sur un message et l'envoyer sur un consommateur en charge de ce header, si nécessaire. C'est une alternative pour partitionner soit même la consommation. ## 2. L'offre historique : Historiquement, le broker à l'Insee est proposé pour 3 cas d'usage, dont un est majoritairement utilisé. ### 2.1 Les 3 cas d'usages : http://architecture-applicative.gitlab-pages.insee.fr/documentation/services_applicatifs/middleware/activemq/ 2 cas d'usage se ressemblent fortement et sont orientés sur l'utilisation du broker comme une solution de securisation pour remonter de l'information de la DMZ à l'interne. **requete/reponse:** Interroger un applicatif interne en un temps contraint pour restituer une réponse sur l'extérieur. S'apparente presque à du RPC, c'est l'utlisation majoritaire fait pour BRPP, FranceConnect, Sirene3 notamment. **requete/action :** Demander une action sur un applicatif interne sans réponse. **Topic:** Notifier des clients. ### 2.2 Focus sur le cas d'usage requete/reponse : C'est celui le plus utilisé, une api en dmz doit restituer une information en interrogeant un module applicatif en interne, le broker faisant passe plat. A l'Insee ca donne le schéma suivant : ![](https://i.imgur.com/gtT2Uc8.png) 1. Une requete arrive sur l'api et le F5 redirige vers un des deux couloirs actifs. 2. Le serveur applicatif mobilise un thread http parmi son pool (200 pour tomcat), il positionne un JmsCorrelationId sur le message (identifiant) et envoie un message dans queue.applis.requests.Dans la foulée de l'envoie le thread http crée un consommateur au sens objet abonné a queue.appli.responses avec un selector sur l'identifiant précédemment crée. Le thread http est alors bloqué en attente d'une réponse pour ce message. 3. Le message est dans la file d'attente et est consommé par un des 3 services internes jar lancés en démon. 4. le service interne renvoie un message de réponse dans queue.appli.responses en mettant un jmsCorrelationId identique au message de requete. 5. le thread http et son consommateur recoive le message en question, le thread http est restitué à son pool. 6. La réponse http est restituée au client. **Avantages :** * Ca marche. **Inconvénients :** * Ca ne respecte pas les best practices pour la performance d'activemq [ici](https://activemq.apache.org/how-should-i-implement-request-response-with-jms#:~:text=The%20best%20way%20to%20implement,request%20messages%20to%20response%20messages.) * En effet, on a un objet consommateur dédié pour chaque requete http, le broker et l'applicatif doivent déjà générer du dialogue réseau pour négocier l'enregistrement d'un consommateur puis le désenregistrer dans un temps contraint http généralement court < 500ms. * Pour une queue avec 2 tomcats actif le broker peut se retrouver à gérer 400 consommateurs à un instant t. **Pièges à éviter :** * Bien souvent en http si la réponse n'arrive pas on a un timeout, il faut donc s'assurer que ce timeout http existe( 60s sur tomcat). * Sans timetolive des messages et si les consommateurs font défaut (éteint ou lent), la queue de requete risque de grossir jusqu'a sa limite. * Avec timetolive des messages, il est important de coordonner les 2 valeurs timetolive sur le message et receiveTimeout le temps d'attente du consommateur du thread http qui attend la réponse. * Pour préserver ce fonctionnement il faut positionner de manière cohérente : * le timeout http * le timetolive des messages * le temps d'attente du consommateur qui bloque la requete http (receiveTimeout en jms) * un rate-limit facultatif Idéalement, il faut connaitre le temps moyen de traitement d'un message par les consommateurs pour s'assurer que les consommateurs sont bien dimensionnés au regard de l'activité http attendue. Si ce n'est pas le cas le timetolive permettra d'éviter au queue de grossir indéfiniment. il faut positionner receiveTimeout > timetolive + un majorant traitement message. En effet, en cas de surcharge dans la queue de requete, si le broker nettoie les messages ayant expirés, vous êtes certains que les messages que vont traiter vos démons sont au bord de l'expiration et vous devez laisser un delta un peu plus grand au consommateur coté requete http. Idéalement, positionner un ratelimit en accord avec les performances des consommateurs permet de s'éviter des soucis. Sinon de monitorer applicativement le nombre de message dans la queue avec la méthode browse par exemple afin de refuser toute nouvelles requete sur l'api si la queue dépasse un seuil. Enfin sachez que les messages expirés par défaut vont dans une dead letter queue sur laquelle il faut avoir un oeil car son activité reflète un dysfonctionnement quelque part. Pour les nouvelles applications, nous avons proposé une petite implémentation différente de ce même cas d'usage dans le poc ici. Des tests au CEI (avec ce qu'il était possible de voir via centreon...) montrent bien qu'avec une implémentation moins stressante le même débit est atteint avec 2 fois moins de cpu pour le broker. Nous reviendrons sur cette implémentatoin un peu plus bas. ## 3. Une utilisation du broker orienté évènement et asynchrone: Nous avons vu que: * Dans les cas simples rester sur de l'échange d'api rest quand c'est possible est probablement un meilleur choix qu'introduire un broker. * L'utiliser pour sécuriser des échanges entre zones a un cout et complexifie les architectures. C'est pourquoi nous proposons de replacer le broker dans le cadre d'utilisation orientée évènement. **Pourquoi parlé d'évènement et d'asynchronisme?** C'est la raison première d'un broker de découpler des modules applicatifs en garantissant une délivrance asynchrone, cela peut permettre de gérer un flux continu d'information la ou souvent on privilégie des traitements batchs ordonnancés sur échange de fichier. ### 3.1 Quoi de mieux qu'un exemple : Attention, plagiat : on reprend des éléments macros d'un livre qui donne des élémens bien plus précis : microservices pattern edition manning. Imaginons un système de commandes de menu pour des restaurants découpé ainsi: - Un **order service** en charge des commandes - Un **restaurant service** en charge de maintenir les infos sur les restaurants. - Un **customer service** en charge de la base client. - Un **kitchen service** en charge de la préparation des commandes. - Un **Accounting service** en charge de la facturation et paiement. - Un **delivery service** en charge de la livraison. ### 3.1.1 La disponibilité du sychrone : ![](https://i.imgur.com/Xvlg5hd.png) 1. Un client via une application demande à passer une commande 2. l'order service récupère l'information sur le client 3. l'order service récupère l'informations sur le restaurant 4. Il crée la commande dans sa base et répond au client que la commande est enregistrée. Pour que ca fonctionne, le choix du REST comme échange interapplicatif ne faisant que du synchrone il faut que tous les systèmes soient disponibles. La disponibilité d'un système distribué étant le produit de la disponibilité de ses composants, plus il y aura d'interaction et de services mis en jeu, plus la disponibilté du service global diminuera. Attention, ce n'est pas lié à REST, le broker utilisé pour request/reponse dans l'offre historique n'améliore pas la disponibilté du service puisque l'échange entre l'api en dmz et le module applicatif interne est contraint et synchrone. Dans le choix d'une architecture full REST se posera les questions de : * Retry en cas d'échec, * Circuit breaker en cas de forte latence * Probablement un mécanisme de découverte de service (meme si le F5 joue ce role en partie à l'Insee). Commme on l'a dit préalablement ca reste un choix pragramatique. ### 3.1.2 La disponibilité de l'asynchrone : Pour augmenter la disponibilité du service, il faut éliminer des échanges synchrones et gagner en disponibilité /réactivité, on pourrait imaginer: 1. Eliminer du synchrone en répliquant la donnée utile pour qu'order service seul soit capable d'une première réponse synchrone. Si nécessaire, il peut s'abonner aux notifications des modifications sur les services clients et restaurants en répliquant les parties utiles dans sa base. 2. Découper le workflow en étape asynchrone. Si order service a la donnée utile sur restaurant et client pour effectuer un premier enregistrement de la commande alors il peut de manière synchrone créer une commande à l'état PENDING et reposer sur un workflow asynchone pour faire évoluer l'état de la commande ensuite par exemple en demandant la validation du client et la validation du restaurant avant plus tard de continuer le workflow jusqu'a la livraison. ![](https://i.imgur.com/jnlwEdA.png) Ca pourrait donner le schéma suivant pour le workflow : ![](https://i.imgur.com/zt3y4Sp.png) Et les 2 queues ou topics pour que l'order service s'abonne aux évènement qui ont modifié l'état des restaurant ou des clients afin qu'il en réplique la partie utile qui le concerne. ![](https://i.imgur.com/bSbELfd.png) Le workflow peut etre plus complexe que seulement la sollicitation de 2 briques donnés pour l'exemple. En synthèse, dans un système distribué: * La choix de la communication entre les services jouent un role clé. * La conception et le fonctionnel peut etre impactés par ces choix. * Il y a un curseur entre la simplicité et la disponibilité. ### 3.2 Les pièges de l'architecture orientée évènement: Pas de magie, cette facon de procéder vient avec ses pièges : #### 3.2.1 Gestion des transactions : Il faut s'assurer de l'atomicité soit : * d'une action en base et de l'envoie d'un ou plusieurs messages. (exemple l'order service crée la commande et envoie un message de demande de validation du client et un de demande de validation du restaurant) * d'une réception de message et de l'enregistrement en base. (exemple un restaurant met à jour une data utile, notifie order service qui lit l'évènement et met à jour ses données répliquées utiles). #### 3.2.2 Une approche utilisateur asynchrone: L'idéal est alors de concevoir une application ou l'utilisateur est notifié de l'avancement asynchrone de sa demande même très rapide. Exemple : * Le client émet une commande il voit qu'elle est pending. * Sauf à avoir un système de websocket pour que son écran se mette à jour au fur et à mesure de l'avancement il peut recevoir un mail ou réactualiser sa page pour avoir l'état de sa commande dans le workflow à l'instant t. Il n'est pas rare pourtant que l'on souhaite quand le workflow est court avoir la réponse totale de manière synchrone. * Le client émet une commande et si la commande est validée il a directement la réponse. Cette possibilité d'implémenter une réponse synchrone sur un workflow interne asynchrone est mis en exemple dans le projet https://gitlab.insee.fr/animation-developpement/etudes-techniques/broker/ws-producteur. #### 3.2.3 Eventual Consistency : Comme le veut le thérème CAP (Consistency, Availability, Partition) si l'on choisit de partitionner son système et d'augmenter la disponibilité, on perd en consistence. A un instant t, le système n'est pas forcément consistent, notamment il se peut que des messages soient en attente et donc que les différents services ne possèdent pas le même niveau d'information. Il faut faire avec et avoir une conception du workflow qui l'accepte ainsi que des programmes performants pour que ce ne soit très limité dans le temps. On peut aussi avoir à recourrir à des programmes de consolidation entre les systèmes ou de réinitialisation. Par exemple, on peut : - prévoir un endpoint qui renotifie tous les restaurants à l'order service ou seulement ceux qui ont été modifiée dans la journée... ## 4. Préconisations autour de springBoot jms : On propose, car ca s'inscrit bien dans la tendance des applicatifs Insee, de reposer sur les librairies spring-jms ou spring-boot-starter-jms qui ont l'avantage: * De mettre en avant les entités de la specification Jms ce qui favoriserait le changement de solution (si un jour c'était nécessaire). * D'être configuré pour les utilisations standards qui sont fait du broker dans l'état de l'art. * D'etre documenté et plutot présent sur internet. Historiquement, pour interagir avec le broker on avait des services internes sous forme jars lancés en démon, on se propose de considérer que dorévanant tout est api et de voir un service autonome un peu de cette façon : ![](https://i.imgur.com/9ocKEgx.png) Une couche de service métier aux périmètres définis ouverte sur l'extérieur par : * une api REST * si c'est nécessaire des connecteurs jms en entrée/sortie envoyé. * l'essentiel du temps sa base de donnée pour la persistence. C'est pourquoi nous avons proposé qu'il soit dorénavant possible pour un serveur applicatif, peu importe le use case qu'il endosse, d'interagir avec le broker en produisant ou consommant des messages. ### 4.1 Les éléments de configuration springboot : En repartant du schéma précédent le découpage en couche service, repository et la mise en place des éléments de configuration de spring-mvc pour déployer une api REST est normalement un sujet déjà connu. Nous allons mettre l'accent sur la partie JMS. ### 4.1. Configuration de base : Un bean annoté @Configuration qui doit faire apparaitre : * Une **ConnectionFactory** : comme son nom l'indique elle est en charge de créer les connexions et a une adhérence du coup au broker utilisé ``` @Bean public ConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(userName, password, brokerUrl); } ``` * Une **jmsTemplate** : par analogie avec la jdbcTemplate elle permet l'interaction avec les queues ou topics et notamment l'envoi de message (outbound adapter), on peut paramétrer dessus son fonctionnement par défaut dès lors qu'elle enverra un message en surchargeant le timetolive du message, la QoS, la persistence ou non du message. ``` @Bean public JmsTemplate jmsTemplate() { JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(connectionFactory()); if (timeToLive > 0) { logger.info("timetolive set to {}", timeToLive); jmsTemplate.setExplicitQosEnabled(true); jmsTemplate.setTimeToLive(timeToLive); if (producerProperties.getPersistenceEnable()) { jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT); } else { jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } } return jmsTemplate; } ``` Attention si cette template doit interagir avec un topic il faut ajouter : ``` jmsTemplate.setPubSubDomain(true); ``` * un **JmsListenerContainerFactory** si votre applicatif consomme des messages (inbound adapter) on se propose de garder l'implémentation par défaut de springboot : ``` @Bean public JmsListenerContainerFactory<?> consumerJmsListenerContainerFactory( MessageListenerContainerProperties messageListenerContainerProperties, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultMessageListenerContainer(); configurer.configure(factory, connectionFactory()); return factory; } ``` Attention si ce JmsListenerContainerFactory doit consommer un topic il faut ajouter : ``` factory.setPubSubDomain(true); ``` Le configurer n'est pas anodin, il fait porter notamment les mécanismes suivants au DefaultJmsListenerContainerFactory dont la javadoc est plein de préconisations : * Mode d'acknowledgement : AUTO * sessionTransacted : true Avec cette configuration, les messages consommés sont acknowledgés lors du commit de la transaction jms. Nous y revenons dans la partie juste en dessous. ## 4.1.1 Utilisation pour qu'un service métier consomme un message : Il suffit de créer un bean qui présente une méthode public annotée par @JmsListener. l'annotation prend : * **destination :** le nom de la queue ou du topic que la méthode écoute. * **containerFactory:** le nom d'un bean JmsListenerContainerFactory définir dans la configuration. * **concurrency :** le nombre de thread lancable en parallèle pour ce listener. Si 1 il n'y a qu'un thread. Si 20, par exemple, il y aura 20 threads et donc 20 consommateurs en concurrence. Si 1-20 springboot scalera le pool en fonction de l'activité. Notez bien ici que ca a un impact sur l'ordre de consommation des messages comme déjà mentionné plus haut. De plus, mettre 1 vous garantira un thread pour votre serveur applicatif mais si vous avez 3 instances du serveur qui tourne vous aurez 3 threads en concurrence, si la nécessité d'un exclusiveConsumer est avérée, il faut se tourner vers le paramètre consumer.exclusive=true pour que le broker se charge de n'envoyer qu'un a un consommateur. Sachez que le pool de taskExecutor est surchargeable. Par défaut, c'est un SimpleAsyncTaskExecutor qui en cas de shutdown s'éteint gracefully. Dans le cadre des fenetres de service au CEI, le tomcat s'éteint de la facon suivante : 1. une demande d'arret graceful, la méthode n'accepte alors plus de message vis a vis du broker. 2. au bout d'1min30 un arret brutal est demandé potentellement le message est perdu si le traitement n'était pas terminé. **Gestion des erreurs** : * La configuration par défaut du configurer engendre un acknowledgement auprès du broker lors du commit. Toute exception non catchée dans la méthode provoquera un rollback de la session jms qui executera la redelivery policy configurable sur la connexion. Par défaut 5 essais et sinon message en DLQ. * Le plus simple quand c'est possible est de catcher toutes les exceptions et de trouver une réponse fonctionnelle en déclencheant soit: * une persistence en base d'échec fonctionnelle ou technique qui sera repris ultérieurement * un envoie d'évènement pour notifier l'échec à un service tiers. **Prefetch Policy :** On peut surcharger la prefetch policy dans la configuration jms spring boot ou sinon via l'url de connection ?jms.prefetchPolicy.all=1 Par défaut, elle vaut N=1000 et donc les consommateurs préfetch 1000 message et au bout de N/2,il en réserve N/2. Il vaut mieux commencer avec prefetchPolicy à 1 puis en cas de besoin de performance juger de l'utilité de ce paramètre. C'est d'autant plus vrai que les services consommateurs auraient un traitement long. En effet, imaginons une queue avec 1000 messages pour lequel on rallumerait plusieurs instances de programme consommateur alors le 1er qui arrive réservera les 1000 messages laissant les autres instances sans travail. ## 4.1.2 Envoi d'un évènement: Il suffit d'autowired dans un bean, la jmsTemplate qui convient pour l'envoi du messages. ``` jmsTemplate.send("nomQueue ou topic" session -> { TextMessage txtmessage = session.createTextMessage(message); return txtmessage; }); ``` On peut faire beaucoup plus de chose sur le message avec des headers etc et il existe plusieurs méthodes. Dans le cas d'une méthode qui enregistre quelque chose en base et envoie un message dans la foulée il faut jouer avec les transactions spring. ``` @Transactional public maMethode(objetMetier){ dao.save(objet); jmsTemplate.send(objet); } ``` A vous de voir si vous préférez catcher autour de la méthode send en préférant, en cas d'erreur sur l'envoi du message, faire un rollback en base ou non. ## 4.2 Un exemple dans le cas particulier de requete/reponse: Pour implémenter une api synchrone sur worflow asynchrone et donc faire quelque chose de proche sous l'angle de request/reponse mais moins d'un point de vue sécurité que d'un point de vue fonctionnelle, nous avons mis un exemple dans le programme https://gitlab.insee.fr/animation-developpement/etudes-techniques/broker/ws-producteur. Voici ce que cela donne: ![](https://i.imgur.com/b3S2T7H.png) 1. Un client arrive et fait une requete http sur l'ensemble des 3 tomcats représentant une api métier. Le F5 redirige sur l'un d'entre eux qui se charge de la requete. 2. Le tomcat utilise son pool de thread http (200 threads) et un timeout d'1 minute pour essayer de répondre à la requete http.L'endpoint consulté crée un DeferredResult dans un pool async ce qui libère le thread http. le message a un jmsCorrelationId le liant au deferredResult qui est géré applicativement. Le message est muni d'un header de routing afin de savoir quel tomcat est émetteur. 3 et 4. Un consommateur prend en charge le traitement du message et constitue une réponse dans une queue de reponse en reportant le header de routing. 5.Chaque tomcat a un listener en charge d'écouter la queue de réponse avec un selector pour son routingId afin qu'il ne traite que les messages dont il est émetteur. Ainsi contrairement à la version historique le jmsListener et le(s) consommateur(s) associés ont une durée de vie longue et stable vis à vis du broker. 8. Soit la réponse est revenue dans un temps accepté fonctionnellement configurable sur le deferredResult et le retour http est fait, soit la réponse n'est pas arrivé à temps et sera ignoré, le defferedresult renverra un resultat partiel l'utilisateur retombera sur un workflow asynchrone. Remarquez que dans cette situation : - on pourrait avoir un workflow mettant en jeu plus de services et de queue. ![](https://i.imgur.com/6XXbk9K.png) - on peut aussi renvoyé plusieurs réponses pour refléter l'avancement du workflow en envoyant des évènements intermédiaire dans queue.appli.responses pour un jmsCorrelationId qui ne fermera pas le deferredResult. ## 5 Un mot sur event sourcing et Cqrs. 2 patterns sont à noter et souvent lié au broker. ### 5.1 CQRS Le pattern CQRS (Command Query Response Segregation) a pour objectif dans un système de séparer les responsabilités entre les écritures et les lectures. Pour démystifier le sujet, on parle d'un pattern déjà existant à l'Insee. En effet, un système prenant les écritures sur une api sur une base et restituant en lecture sur une autre api qui tape sur une base clonée est une manière de faire du cqrs pourvu que le mécanisme de synchronisation des bases et le temps de réplication conviennent. Pour autant, selon les critères de performance et disponibilité attendues sur le système cela peut etre fait à plusieurs niveaux comme bien imagé ici. https://github.com/ddd-by-examples/all-things-cqrs Un broker d'évènement est une manière de transporter les informations de synchronisation. - Une api en écriture peut écrire dans une database A et émettre des évènements dans un broker. - Une api dont la responsabilité est de rendre en lecture peut consommer les évènements et mettre à jour sa database B pour restituer les informations en lecture. Contrairement à un clone d'infra de base de données: - les donnnées peuvent etre modélisées pour le use case associé (par exemple dénormalisée en lecture) - la database peut etre différente (par exemple un moteur d'indexation si la lecture nécessite des fonctions d'indexation) ou tuner en fonction de la problématique. - la solution est indépendante de procédure de clone d'infra qui bien souvent fige les architectures. - plutot que sauvegarder la base de l'api en lecture on peut imaginer un mécanisme de reconstruction depuis la source en écriture. Le revers de la médaille de cette flexibilité applicative est d'assurer la cohérence entre les différentes données. Pour cela une pratique est de faire correspondre des versions sur les entités transmises dans les évènements. ### 5.2 Event Sourcing L'event sourcing est un autre pattern d'architecture qui consiste à organiser et modéliser son système en considérant que tout ce qui arrive à un instant précis est le résultat d’une succession d’événements. Pour cela, on alimente souvent une pièce centreale qu'on appelle event store immuable et exhaustif qu'on alimente en fonction de l'arrivée des évènements. Sur ces streams d'évènements, on construit des entités métiers comme des aggrégations de la succesion d'évènement sur lesquels le système : * Prend des decisions métiers * Bien souvent inscrit l'état final de l'entité dans un système en lecture. Exemple une entreprise peut etre modélisé par la suite des évènements qu'elle a subie, son état final en est une résultante. ![](https://i.imgur.com/fJIB0qd.png) A noter que bien souvent la prise de décison nécessite aussi une version de l'évènement pour s'assurer lorsqu'on prend une décision et qu'on ajoute un évènement, qu'entre temps un autre traitement n'a modifié le stream. C'est pourquoi l'on trouve régulièrment les 2 patterns eventsourcing et cqrs associés. ![](https://i.imgur.com/5Z7wqP1.png) L'eventsourcing peut mettre en jeu un broker ou non mais constitue en soit un challenge avec un paradigme sur la conception à s'approprier. Paradigme d'autant plus complexe qu'il faudra en partager la vision avec l'équipe MOA. Il faudra aussi au long de la vie de l'application envisager des traitements pour rendre périodiquement l'eventStore plus compact.