---
tags: kenzo,cosmosdb,spec
title : Connecteur CosmosDB
description:
---
# Spécifications générales Connecteur CosmodDB pour DataStage
[toc]
## 1. Introduction
**Besoin**: développement d'un connecteur Azure CosmosDB pour l'ETL DataStage pour y effectuer des opérations de lecture, écriture et administration.
Les api/sdk DataStage permettant cela étant en Java, ce connecteur doit être développé en Java.
**Moyens/Contraintes**:
* SDK CosmosDB Java dernière version (4.?)
* SDK DataStage disponible avec les clients ou serveurs DataStage.
:::warning
La libraire DataStage n'est pas disponible dans un repository public. Il faudra la copier dans les dépendances/libraires du projet manuellement.
:::
## 2. Capacités
Le connecteur aura les capacités suivantes:
* Lecture: extraction totale ou filtrée (statique) de collections
* Ecriture: insert/update de documents dans une collection.
* Sparse: extraction filtrée de collections. Les paramètres du filtres sont fournis par les données entrantes du connecteurs (dynamique).
## 3. Cinématique
Le principe général sera d'utiliser le SDK asynchrone afin d'exploiter au maximum les capacités de CosmosDB en termes de débit. En cas de surconsommation du fait d'accès concurrents, le SDK effectuera le nombre de retrys paramétrés.
Il s'agira également de provisionner au préalable un débit maximal paramétré et de le redescendre une fois les opérations terminées.
:::warning
Il ne faudra toutefois pas baisser le débit si celui-ci a été monté par une autre application. Les API permettent éventuellement de vérifier certaines informations (api monitoring ?)
:::
````mermaid
sequenceDiagram
Connecteur ->> Cosmos DB: Connexion
Connecteur ->> Cosmos DB: Vérification/tuning capacités
Note left of Connecteur: Les opérations sont soumises<br/>de manière asynchrone. <br/>Dans une limite paramétrable <br/>d'opérations simultanées.
loop Opérations
Connecteur ->> Cosmos DB: Opération asynchrone
Cosmos DB -->> Connecteur: Ok
Cosmos DB --x Connecteur: RequestRateTooLarge
Note over Connecteur,Cosmos DB: En cas de dépassement de capacité<br/>on attends le temps retourné<br/> avant de soumettre l'opération à nouveau<br/>Géré de manière implicite par le SDK<br/>Le nombre de retry max est à paramétrer<br/> (pertinent en cas d'accès concurrentiels)
alt 429
Connecteur -->> Connecteur: Wait x-ms-retry-after-ms
Connecteur ->> Cosmos DB: retry
end
end
Note over Connecteur,Cosmos DB: Toutes les opérations sont terminées
Connecteur ->> Cosmos DB: Vérification concurrence<br/>tuning down des capacités
````
## 4. Paramètres
Conserver les paramètres existants:
* Communs:
* Host
* Password
* Database
* Collection
* DocumentId
* Parallélisme maximal
* Tuning capacités
* Sérialisation/désérialisation ?
* En lecture:
* Sélection
* Filter
* Nom(s) des colonnes DataStage qui contiendront les données
* En Écriture
* WriteMode
* Nom(s) des colonnes DataStage qui contiennent les données
Plus tout les paramètres permis par le sdk cosmosDb.
## 5. Logs
Les logs basiques:
* Rappel des paramètres
* Validation de connexion
* Message d'erreur
* Métriques
Sont à tracer via l'api DataStage afin d'être facilement accessibles par l'exploitation.
Pour les logs plus verbeuses et de debug, utiliser le framework ad-hoc pour de l'écriture en fichier plat. Ces logs seront activables par une variable d'environnement.
## 6. Structure code source
Le code ci-dessous présente la structure générique d'un job lisant/produisant des données.
**Fichier `src\main\java\com\ibm\is\cc\javastage\cosmosDbStage\cosmosDbStage.java`**:
```java=
// Nom du stage
package com.ibm.is.cc.javastage.cosmosDbStage;
// import java
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
// dépendances datastage
import com.ibm.is.cc.javastage.api.Capabilities;
import com.ibm.is.cc.javastage.api.ColumnMetadata;
import com.ibm.is.cc.javastage.api.ColumnMetadataImpl;
import com.ibm.is.cc.javastage.api.Configuration;
import com.ibm.is.cc.javastage.api.InputLink;
import com.ibm.is.cc.javastage.api.InputRecord;
import com.ibm.is.cc.javastage.api.Link;
import com.ibm.is.cc.javastage.api.OutputLink;
import com.ibm.is.cc.javastage.api.OutputRecord;
import com.ibm.is.cc.javastage.api.Processor;
import com.ibm.is.cc.javastage.api.RejectRecord;
import com.ibm.is.cc.javastage.api.PropertyDefinition;
import com.ibm.is.cc.javastage.api.Logger;
// autres dépendances
// ...
public class cosmosDbStage extends Processor {
// définition liens entrants/sortants/rejets
private InputLink m_inputLink;
private OutputLink m_outputLink;
private OutputLink m_rejectLink;
// définition paramètres
private String exempleOption;
// définition métriques
private int inputRecords = 0;
private int outputRecords = 0;
private int rejectRecords = 0;
private int dropRecords = 0;
private int m_nodeID = -1;
public cosmosDbStage() {
super();
Logger.setComponentID("cosmosDbStage");
}
// définition des capacités du stage
// nombre de liens entrants/sortants etc...
public Capabilities getCapabilities() {
Capabilities capabilities = new Capabilities();
capabilities.setMinimumInputLinkCount(1);
capabilities.setMaximumInputLinkCount(1);
capabilities.setMinimumOutputStreamLinkCount(1);
capabilities.setMaximumOutputStreamLinkCount(1);
capabilities.setMaximumRejectLinkCount(1);
capabilities.setIsWaveGenerator(false);
capabilities.setIsRunOnConductor(true);
return capabilities;
}
// création et vérification des propriétés
public List < PropertyDefinition > getUserPropertyDefinitions() {
List < PropertyDefinition > propList = new ArrayList < PropertyDefinition > ();
propList.add(new PropertyDefinition("exempleOption", "Valeur par défaut", "exempleOption", "Description", PropertyDefinition.Scope.STAGE));
return propList;
}
public boolean validateConfiguration(Configuration configuration, boolean isRuntime) throws Exception {
m_nodeID = configuration.getNodeNumber();
m_inputLink = configuration.getInputLink(0);
m_outputLink = configuration.getOutputLink(0);
m_rejectLink = m_inputLink.getAssociatedRejectLink();
Properties userStageProperties = configuration.getUserProperties();
exempleOption = userStageProperties.getProperty("exempleOption");
if (m_nodeID == -1) {
Logger.information(1,"exempleOption:" + exempleOption);
}
// logique de validation
// si non valide:
// Logger.fatal("Invalid parameter");
// terminate(true);
return true;
}
// défintion des nouvelles colonnes en sortie
public List < ColumnMetadata > getAdditionalOutputColumns(Link outputLink, List < Link > inputLinks, Properties stageProperties) {
List < ColumnMetadata > addtionalColumns = new ArrayList < ColumnMetadata > ();
ColumnMetadataImpl output = new ColumnMetadataImpl("FieldOutput", ColumnMetadata.SQL_TYPE_VARCHAR);
output.setNullable(true);
addtionalColumns.add(output);
return addtionalColumns;
}
public void process() throws Exception {
// initialisations
// connexion cosmosDb
// ..
Logger.information(10,"Processing input data");
// boucle les enregistrements entrants
do {
InputRecord inputRecord = m_inputLink.readRecord();
if (inputRecord == null) {
// No more input
break;
}
// lecture colonne entrée
String inputData = (String) inputRecord.getValue("inputField");
inputRecords++;
// exemple debug/warning/fatal
if (Logger.isDebugEnabled()) {
Logger.debug(99,"message");
}
Logger.warning(1,"Warning message");
Logger.fatal(1,"fatal message");
//exemple rejet
rejRecord.setErrorText("Invalid data");
rejRecord.setErrorCode(1);
m_rejectLink.writeRecord(rejRecord);
rejectRecords++;
//
// écriture sortie
outputRecord.setValue("outputField", dataOut);
m_outputLink.writeRecord(outputRecord);
outputRecords++;
} while ( true );
Logger.information(100,"Input records:"+inputRecords);
Logger.information(100,"Output records:"+outputRecords);
Logger.information(100,"Rejected records:"+rejectRecords);
Logger.information(100,"Droped records:"+dropRecords);
}
}
```
## 7. Configuration projet
On peut choisir Ant ou Maven. Ant est intégré au client Manager de DataStage.
### 7.1 Ant
1. Placer le fichier ccjava-api.jar et toutes les autres dépendances dans `lib`
2. **Fichier `build.xml`**:
```xml
<?xml version="1.0" encoding="UTF-8" ?>
<project name="jsonataStage" default="build" basedir=".">
<property environment="env"/>
<property name="src" value="${basedir}/src/main/java"/>
<property name="classes" value="${basedir}/classes"/>
<property name="javadoc" value="${basedir}/javadoc"/>
<property name="jars" value="${basedir}/jars"/>
<property name="api.jar" value="${basedir}/lib/ccjava-api.jar"/>
<property name="deps.jar" value="${basedir}/lib/deps.jar"/>
<property name="out.jar" value="${jars}/cosmosDbStage.jar"/>
<path id="build.classpath">
<pathelement location="${api.jar}"/>
<pathelement location="${deps.jar}"/>
<pathelement location="${classes}"/>
</path>
<target name="build">
<mkdir dir="${jars}"/>
<mkdir dir="${classes}"/>
<mkdir dir="${javadoc}"/>
<javac srcdir="${src}"
source="1.8" target="1.8"
destdir="${classes}"
classpathref="build.classpath"
debug="true"
deprecation="true"
optimize="false">
</javac>
<jar jarfile="${out.jar}">
<fileset dir="${classes}">
<include name="com/ibm/is/cc/javastage/**/**/*.class"/>
<include name="com/ibm/is/cc/javastage/**d/bean/**/*.class"/>
</fileset>
</jar>
<javadoc sourcepath="${src}" packagenames="com.ibm.is.cc.javastage.cosmosDbStage" classpathref="build.classpath" destdir="${javadoc}" version="true" use="true" windowtitle="jsonata stage"/>
</target>
<target name="clean">
<delete quiet="true" dir="${classes}"/>
<delete quiet="true" dir="${jars}"/>
<delete quiet="true" dir="${javadoc}"/>
</target>
</project>
```
### 7.2. Maven
1. Placer le fichier ccjava-api.jar dans `lib`
2. **Fichier `pom.xml`**:
```xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ibm.is.cc.javastage.cosmosDbStage</groupId>
<artifactId>cosmosDbStage</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>cosmosDbStage</name>
<description>cosmosDb Stage</description>
<dependencies>
<dependency>
<groupId>com.ibm.jsonata4java</groupId>
<artifactId>JSONata4Java</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>com.ibm.is.cc.javastage.api</groupId>
<artifactId>datastage</artifactId>
<version>1.0.0</version>
<scope>system</scope>
<systemPath>${basedir}/lib/ccjava-api.jar</systemPath>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<organization>
<name>Kenzo</name>
</organization>
</project>
```