--- tags: kenzo,cosmosdb,spec title : Connecteur CosmosDB description: --- # Spécifications générales Connecteur CosmodDB pour DataStage [toc] ## 1. Introduction **Besoin**: développement d'un connecteur Azure CosmosDB pour l'ETL DataStage pour y effectuer des opérations de lecture, écriture et administration. Les api/sdk DataStage permettant cela étant en Java, ce connecteur doit être développé en Java. **Moyens/Contraintes**: * SDK CosmosDB Java dernière version (4.?) * SDK DataStage disponible avec les clients ou serveurs DataStage. :::warning La libraire DataStage n'est pas disponible dans un repository public. Il faudra la copier dans les dépendances/libraires du projet manuellement. ::: ## 2. Capacités Le connecteur aura les capacités suivantes: * Lecture: extraction totale ou filtrée (statique) de collections * Ecriture: insert/update de documents dans une collection. * Sparse: extraction filtrée de collections. Les paramètres du filtres sont fournis par les données entrantes du connecteurs (dynamique). ## 3. Cinématique Le principe général sera d'utiliser le SDK asynchrone afin d'exploiter au maximum les capacités de CosmosDB en termes de débit. En cas de surconsommation du fait d'accès concurrents, le SDK effectuera le nombre de retrys paramétrés. Il s'agira également de provisionner au préalable un débit maximal paramétré et de le redescendre une fois les opérations terminées. :::warning Il ne faudra toutefois pas baisser le débit si celui-ci a été monté par une autre application. Les API permettent éventuellement de vérifier certaines informations (api monitoring ?) ::: ````mermaid sequenceDiagram Connecteur ->> Cosmos DB: Connexion Connecteur ->> Cosmos DB: Vérification/tuning capacités Note left of Connecteur: Les opérations sont soumises<br/>de manière asynchrone. <br/>Dans une limite paramétrable <br/>d'opérations simultanées. loop Opérations Connecteur ->> Cosmos DB: Opération asynchrone Cosmos DB -->> Connecteur: Ok Cosmos DB --x Connecteur: RequestRateTooLarge Note over Connecteur,Cosmos DB: En cas de dépassement de capacité<br/>on attends le temps retourné<br/> avant de soumettre l'opération à nouveau<br/>Géré de manière implicite par le SDK<br/>Le nombre de retry max est à paramétrer<br/> (pertinent en cas d'accès concurrentiels) alt 429 Connecteur -->> Connecteur: Wait x-ms-retry-after-ms Connecteur ->> Cosmos DB: retry end end Note over Connecteur,Cosmos DB: Toutes les opérations sont terminées Connecteur ->> Cosmos DB: Vérification concurrence<br/>tuning down des capacités ```` ## 4. Paramètres Conserver les paramètres existants: * Communs: * Host * Password * Database * Collection * DocumentId * Parallélisme maximal * Tuning capacités * Sérialisation/désérialisation ? * En lecture: * Sélection * Filter * Nom(s) des colonnes DataStage qui contiendront les données * En Écriture * WriteMode * Nom(s) des colonnes DataStage qui contiennent les données Plus tout les paramètres permis par le sdk cosmosDb. ## 5. Logs Les logs basiques: * Rappel des paramètres * Validation de connexion * Message d'erreur * Métriques Sont à tracer via l'api DataStage afin d'être facilement accessibles par l'exploitation. Pour les logs plus verbeuses et de debug, utiliser le framework ad-hoc pour de l'écriture en fichier plat. Ces logs seront activables par une variable d'environnement. ## 6. Structure code source Le code ci-dessous présente la structure générique d'un job lisant/produisant des données. **Fichier `src\main\java\com\ibm\is\cc\javastage\cosmosDbStage\cosmosDbStage.java`**: ```java= // Nom du stage package com.ibm.is.cc.javastage.cosmosDbStage; // import java import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Properties; // dépendances datastage import com.ibm.is.cc.javastage.api.Capabilities; import com.ibm.is.cc.javastage.api.ColumnMetadata; import com.ibm.is.cc.javastage.api.ColumnMetadataImpl; import com.ibm.is.cc.javastage.api.Configuration; import com.ibm.is.cc.javastage.api.InputLink; import com.ibm.is.cc.javastage.api.InputRecord; import com.ibm.is.cc.javastage.api.Link; import com.ibm.is.cc.javastage.api.OutputLink; import com.ibm.is.cc.javastage.api.OutputRecord; import com.ibm.is.cc.javastage.api.Processor; import com.ibm.is.cc.javastage.api.RejectRecord; import com.ibm.is.cc.javastage.api.PropertyDefinition; import com.ibm.is.cc.javastage.api.Logger; // autres dépendances // ... public class cosmosDbStage extends Processor { // définition liens entrants/sortants/rejets private InputLink m_inputLink; private OutputLink m_outputLink; private OutputLink m_rejectLink; // définition paramètres private String exempleOption; // définition métriques private int inputRecords = 0; private int outputRecords = 0; private int rejectRecords = 0; private int dropRecords = 0; private int m_nodeID = -1; public cosmosDbStage() { super(); Logger.setComponentID("cosmosDbStage"); } // définition des capacités du stage // nombre de liens entrants/sortants etc... public Capabilities getCapabilities() { Capabilities capabilities = new Capabilities(); capabilities.setMinimumInputLinkCount(1); capabilities.setMaximumInputLinkCount(1); capabilities.setMinimumOutputStreamLinkCount(1); capabilities.setMaximumOutputStreamLinkCount(1); capabilities.setMaximumRejectLinkCount(1); capabilities.setIsWaveGenerator(false); capabilities.setIsRunOnConductor(true); return capabilities; } // création et vérification des propriétés public List < PropertyDefinition > getUserPropertyDefinitions() { List < PropertyDefinition > propList = new ArrayList < PropertyDefinition > (); propList.add(new PropertyDefinition("exempleOption", "Valeur par défaut", "exempleOption", "Description", PropertyDefinition.Scope.STAGE)); return propList; } public boolean validateConfiguration(Configuration configuration, boolean isRuntime) throws Exception { m_nodeID = configuration.getNodeNumber(); m_inputLink = configuration.getInputLink(0); m_outputLink = configuration.getOutputLink(0); m_rejectLink = m_inputLink.getAssociatedRejectLink(); Properties userStageProperties = configuration.getUserProperties(); exempleOption = userStageProperties.getProperty("exempleOption"); if (m_nodeID == -1) { Logger.information(1,"exempleOption:" + exempleOption); } // logique de validation // si non valide: // Logger.fatal("Invalid parameter"); // terminate(true); return true; } // défintion des nouvelles colonnes en sortie public List < ColumnMetadata > getAdditionalOutputColumns(Link outputLink, List < Link > inputLinks, Properties stageProperties) { List < ColumnMetadata > addtionalColumns = new ArrayList < ColumnMetadata > (); ColumnMetadataImpl output = new ColumnMetadataImpl("FieldOutput", ColumnMetadata.SQL_TYPE_VARCHAR); output.setNullable(true); addtionalColumns.add(output); return addtionalColumns; } public void process() throws Exception { // initialisations // connexion cosmosDb // .. Logger.information(10,"Processing input data"); // boucle les enregistrements entrants do { InputRecord inputRecord = m_inputLink.readRecord(); if (inputRecord == null) { // No more input break; } // lecture colonne entrée String inputData = (String) inputRecord.getValue("inputField"); inputRecords++; // exemple debug/warning/fatal if (Logger.isDebugEnabled()) { Logger.debug(99,"message"); } Logger.warning(1,"Warning message"); Logger.fatal(1,"fatal message"); //exemple rejet rejRecord.setErrorText("Invalid data"); rejRecord.setErrorCode(1); m_rejectLink.writeRecord(rejRecord); rejectRecords++; // // écriture sortie outputRecord.setValue("outputField", dataOut); m_outputLink.writeRecord(outputRecord); outputRecords++; } while ( true ); Logger.information(100,"Input records:"+inputRecords); Logger.information(100,"Output records:"+outputRecords); Logger.information(100,"Rejected records:"+rejectRecords); Logger.information(100,"Droped records:"+dropRecords); } } ``` ## 7. Configuration projet On peut choisir Ant ou Maven. Ant est intégré au client Manager de DataStage. ### 7.1 Ant 1. Placer le fichier ccjava-api.jar et toutes les autres dépendances dans `lib` 2. **Fichier `build.xml`**: ```xml <?xml version="1.0" encoding="UTF-8" ?> <project name="jsonataStage" default="build" basedir="."> <property environment="env"/> <property name="src" value="${basedir}/src/main/java"/> <property name="classes" value="${basedir}/classes"/> <property name="javadoc" value="${basedir}/javadoc"/> <property name="jars" value="${basedir}/jars"/> <property name="api.jar" value="${basedir}/lib/ccjava-api.jar"/> <property name="deps.jar" value="${basedir}/lib/deps.jar"/> <property name="out.jar" value="${jars}/cosmosDbStage.jar"/> <path id="build.classpath"> <pathelement location="${api.jar}"/> <pathelement location="${deps.jar}"/> <pathelement location="${classes}"/> </path> <target name="build"> <mkdir dir="${jars}"/> <mkdir dir="${classes}"/> <mkdir dir="${javadoc}"/> <javac srcdir="${src}" source="1.8" target="1.8" destdir="${classes}" classpathref="build.classpath" debug="true" deprecation="true" optimize="false"> </javac> <jar jarfile="${out.jar}"> <fileset dir="${classes}"> <include name="com/ibm/is/cc/javastage/**/**/*.class"/> <include name="com/ibm/is/cc/javastage/**d/bean/**/*.class"/> </fileset> </jar> <javadoc sourcepath="${src}" packagenames="com.ibm.is.cc.javastage.cosmosDbStage" classpathref="build.classpath" destdir="${javadoc}" version="true" use="true" windowtitle="jsonata stage"/> </target> <target name="clean"> <delete quiet="true" dir="${classes}"/> <delete quiet="true" dir="${jars}"/> <delete quiet="true" dir="${javadoc}"/> </target> </project> ``` ### 7.2. Maven 1. Placer le fichier ccjava-api.jar dans `lib` 2. **Fichier `pom.xml`**: ```xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ibm.is.cc.javastage.cosmosDbStage</groupId> <artifactId>cosmosDbStage</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>cosmosDbStage</name> <description>cosmosDb Stage</description> <dependencies> <dependency> <groupId>com.ibm.jsonata4java</groupId> <artifactId>JSONata4Java</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>com.ibm.is.cc.javastage.api</groupId> <artifactId>datastage</artifactId> <version>1.0.0</version> <scope>system</scope> <systemPath>${basedir}/lib/ccjava-api.jar</systemPath> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> <version>2.2.1</version> <executions> <execution> <id>attach-sources</id> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-javadoc-plugin</artifactId> <version>2.9.1</version> <executions> <execution> <id>attach-javadocs</id> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <organization> <name>Kenzo</name> </organization> </project> ```