# Do Jupyter ao Oozie ###### tags: `jupyterhub` `oozie` Nesse guia vamos realizar os passos necessários para transformar o seu notebook feito no JupyterHub em um job que pode ser agendado para execução no Oozie. Vamos focar em um job Spark que será submetido no Yarn, embora não seja obrigatório que seu job tenha essas características. ## 1. JupyterHub Abaixo temos os passos que são executados do lado do JupyterHub para preparar o seu código para ser executado no Oozie. ### 1.1 Notebook Vamos utilizar o notebook abaixo como exemplo, com ele vamos conseguir exemplificar todas as particularidades do processo. ![notebook](https://i.imgur.com/HSNg9en.png) Abaixo está o código da figura do notebook acima. ```python= from pyspark.sql import SparkSession import numpy as np import util spark = SparkSession.builder.master('local[*]').getOrCreate() arr = np.array([[2,3], [2,8], [2,3],[4,5]]) df = util.np_to_df(spark, arr) df.show() df.write.mode('overwrite').parquet('data-test') ``` Temos também um arquivo util.py a parte. ```python= def np_to_df(spark, arr): sc = spark.sparkContext rdd1 = sc.parallelize(arr) rdd2 = rdd1.map(lambda x: [int(i) for i in x]) df = rdd2.toDF(["A", "B"]) return df ``` ### 1.2 Notebook para Python O próprio Jupyter possui um utilitário que transforma um arquivo notebook (`ipynb`) em arquivo python (`.py`). Considerando que o notebook tenha o nome `meu-job.ipynb` o comando abaixo transforma o arquivo em `.py`. ```bash= jupyter nbconvert meu-job.ipynb --to python [NbConvertApp] Converting notebook meu-job.ipynb to python [NbConvertApp] Writing 379 bytes to meu-job.py ``` O arquivo convertido fica dessa maneira. ```python= #!/usr/bin/env python # coding: utf-8 # In[1]: from pyspark.sql import SparkSession # In[2]: import numpy as np import util # In[3]: spark = SparkSession.builder.master('local[*]').getOrCreate() # In[4]: arr = np.array([[2,3], [2,8], [2,3],[4,5]]) # In[5]: df = util.np_to_df(spark, arr) df.show() # In[6]: df.write.mode('overwrite').parquet('data-test') ``` ### 1.3 Dependências Python Os outros arquivos `.py` necessários para que o notebook funcione são compactados em formato zip e disponibilizados passados durante o processo de submit ao Yarn, por enquanto vamos apenas compactar essas dependências. ```bash= # No comando abaixo nós compactamos todos os arquivos .py, exceto o main $ zip -x meu-job.py -r meu-job-deps.zip *.py adding: util.py (deflated 22%) ``` ### 1.4 Bibliotecas de terceiros As bibliotecas de terceiros, como o pyspark, pandas e numpy são disponibilizadas no job através do `conda`, para isso nos comandos abaixo vamos criar um environment específico para o job, instalar as dependências e empacotar o resultado em um arquivo `.tar.gz`. ```bash= # 1) Criamos o environment do conda $ conda create --name meu-job Collecting package metadata (current_repodata.json): done Solving environment: done ## Package Plan ## environment location: /home/jovyan/.condaenvs/meu-job Proceed ([y]/n)? y Preparing transaction: done Verifying transaction: done Executing transaction: done # # To activate this environment, use # # $ conda activate meu-job # # To deactivate an active environment, use # # $ conda deactivate ``` ```bash= # 2) Iniciar o conda $ conda init bash no change /opt/conda/condabin/conda no change /opt/conda/bin/conda no change /opt/conda/bin/conda-env no change /opt/conda/bin/activate no change /opt/conda/bin/deactivate no change /opt/conda/etc/profile.d/conda.sh no change /opt/conda/etc/fish/conf.d/conda.fish no change /opt/conda/shell/condabin/Conda.psm1 no change /opt/conda/shell/condabin/conda-hook.ps1 no change /opt/conda/lib/python3.7/site-packages/xontrib/conda.xsh no change /opt/conda/etc/profile.d/conda.csh modified /home/jovyan/.bashrc ==> For changes to take effect, close and re-open your current shell. <== ``` ```bash= # 3) Carregar a configuração no bash $ source ~/.bashrc # 4) Selecionamos o environment $ conda activate meu-job ``` ```bash= # 5) Instalamos as dependências com o conda $ conda install --yes python=3.7 numpy conda-pack ## Package Plan ## environment location: /home/jovyan/.condaenvs/meu-job added / updated specs: - conda-pack - numpy - python=3.7 The following packages will be downloaded: package | build ---------------------------|----------------- _libgcc_mutex-0.1 | conda_forge 3 KB conda-forge _openmp_mutex-4.5 | 1_gnu 22 KB conda-forge conda-pack-0.4.0 | py_0 25 KB conda-forge ld_impl_linux-64-2.34 | hc38a660_9 612 KB conda-forge libblas-3.8.0 | 17_openblas 11 KB conda-forge libcblas-3.8.0 | 17_openblas 11 KB conda-forge libffi-3.2.1 | he1b5a44_1007 47 KB conda-forge liblapack-3.8.0 | 17_openblas 11 KB conda-forge libopenblas-0.3.10 |pthreads_hb3c22a3_4 7.8 MB conda-forge python_abi-3.7 | 1_cp37m 4 KB conda-forge tk-8.6.10 | hed695b0_0 3.2 MB conda-forge wheel-0.34.2 | py_1 24 KB conda-forge zlib-1.2.11 | h516909a_1006 105 KB conda-forge ------------------------------------------------------------ Total: 11.9 MB ``` ```bash= # 6) Também podemos instalar dependências com o pip # Utilizamos o pyspark 2.3.2, pois é a versão configurada no ambiente $ pip install pyspark==2.3.2 Processing ./.cache/pip/wheels/bd/3f/bb/cee54d865446970f330979b926919bfb33db0782e636e8e7e7/pyspark-2.3.2-py2.py3-none-any.whl Collecting py4j==0.10.7 Using cached py4j-0.10.7-py2.py3-none-any.whl (197 kB) Installing collected packages: py4j, pyspark Successfully installed py4j-0.10.7 pyspark-2.3.2 ``` ```bash= # 7) Listamos as dependências instaladas $ conda list # packages in environment at /home/jovyan/.condaenvs/meu-job: # # Name Version Build Channel _libgcc_mutex 0.1 conda_forge conda-forge _openmp_mutex 4.5 1_gnu conda-forge ca-certificates 2020.6.20 hecda079_0 conda-forge certifi 2020.6.20 py37hc8dfbb8_0 conda-forge conda-pack 0.4.0 py_0 conda-forge ld_impl_linux-64 2.34 hc38a660_9 conda-forge libblas 3.8.0 17_openblas conda-forge libcblas 3.8.0 17_openblas conda-forge libffi 3.2.1 he1b5a44_1007 conda-forge libgcc-ng 9.3.0 h24d8f2e_14 conda-forge libgfortran-ng 7.5.0 hdf63c60_14 conda-forge libgomp 9.3.0 h24d8f2e_14 conda-forge liblapack 3.8.0 17_openblas conda-forge libopenblas 0.3.10 pthreads_hb3c22a3_4 conda-forge libstdcxx-ng 9.3.0 hdf63c60_14 conda-forge ncurses 6.2 he1b5a44_1 conda-forge numpy 1.19.1 py37h8960a57_0 conda-forge openssl 1.1.1g h516909a_1 conda-forge pip 20.2.1 py_0 conda-forge py4j 0.10.7 pypi_0 pypi pyspark 2.3.2 pypi_0 pypi python 3.7.8 h6f2ec95_1_cpython conda-forge python_abi 3.7 1_cp37m conda-forge readline 8.0 he28a2e2_2 conda-forge setuptools 49.2.1 py37hc8dfbb8_0 conda-forge sqlite 3.32.3 hcee41ef_1 conda-forge tk 8.6.10 hed695b0_0 conda-forge wheel 0.34.2 py_1 conda-forge xz 5.2.5 h516909a_1 conda-forge zlib 1.2.11 h516909a_1006 conda-forge ``` ```bash= # 8) Empacotamos tudo, será criado um arquivo meu-job.tar.gz $ conda pack -n meu-job Collecting packages... Packing environment at '/home/jovyan/.condaenvs/meu-job' to 'meu-job.tar.gz' [########################################] | 100% Completed | 20.3s ``` ### 1.5 Copiar arquivos para o HDFS Até o momento nós temos três artefatos: 1) Arquivo meu-job.py que foi gerado a partir do notebook; 2) Arquivo meu-job-deps.zip que foi gerado compactandos os outros arquivos python; 3) Arquivo meu-job.tar.gz que foi gerado com o `conda pack` compactando as dependências Todos esses arquivos serão acessados pelo Oozie através do HDFS, portanto o comando abaixo mostra como criar um diretório e copiar os arquivos para o HDFS via linha de comando. ```bash # Criação de um diretório para armazenar os arquivos $ hdfs dfs -mkdir -p /jobs/meu-job # Copiar os arquivos locais para o HDFS $ hdfs dfs -copyFromLocal \ meu-job.py meu-job-deps.zip meu-job.tar.gz \ /jobs/meu-job # Verifique se os arquivos estão todos lá $ hdfs dfs -ls -h /jobs/meu-job/ Found 3 items -rw-r--r-- 3 admin hdfs 297 2020-08-26 09:44 /jobs/meu-job/meu-job-deps.zip -rw-r--r-- 3 admin hdfs 444 2020-08-26 09:44 /jobs/meu-job/meu-job.py -rw-r--r-- 3 admin hdfs 303.2 M 2020-08-26 09:44 /jobs/meu-job/meu-job.tar.gz ``` ## 2. Oozie O Apache Oozie é responsável por definir workflows de processamento e executá-los de acordo com o agendamento realizado. Nesse guia vamos utilizá-lo para submeter o nosso job no Yarn. Embora o próprio Oozie já tenha uma opção direta para definir um workflow Spark, ele é um pouco limitado pois não nos deixa determinar o environment corretamente. Para contornar essa limitação, vamos utilizar um script bash que configura todos os artefatos e faz o submit no Yarn. ### 2.1 Submit script O script abaixo é bem simples e contém vários comentários para ajudar a entendê-lo. ```bash #!/bin/bash hdfs_host=127.0.0.1 hdfs_port=8020 hdfs_user=admin java_home="/usr/jdk64/jdk1.8.0_112" force_delete=false num_executors=2 driver_memory=1G driver_cores=4 executor_memory=1G executor_cores=4 # ----------------------------------------------------------------------------------------------- # # Help # # ----------------------------------------------------------------------------------------------- # function show_help() { echo "" echo "Usage: : ${0} -e <path> -p <path> -d [<dependências python em formato zip>] -a [argumentos do script python separados por espaço]" echo "" echo "Parâmetros:" echo " -e <env tar.gz file> - Caminho completo do arquivo environment no hdfs" echo " -p <python file> - Caminho completo do arquivo python no hdfs" echo " -d <python deps> - Caminho completo do arquivo zip no hdfs" echo " -a [script args] - Argumentos do script python" echo " -H [host do hdfs] - Host do HDFS (default: ${hdfs_host})" echo " -P [porta do hdfs] - Porta do HDFS (default: ${hdfs_port})" echo " -U [user do hdfs] - Usuário do HDFS (default: ${hdfs_user})" echo " -j [java home] - Java home (default: ${java_home})" echo " -s [submit mode] - Modo de submit: python (default) ou spark" echo " -x [executor memory] - Configuração do executor-memory do spark (default: ${executor_memory})" echo " -X [executor cores] - Configuração do executor-cores do spark (default: ${executor_cores})" echo " -r [driver memory] - Configuração do driver-memory do spark (default: ${driver_memory})" echo " -R [driver cores] - Configuração do driver-cores do spark (default: ${driver_cores})" echo " -n [num executors] - Configuração do num-executors do spark (default: ${num_executors})" echo " -F - Força a deleção do diretório run mesmo em caso de erro (default: false)" echo "" } # ----------------------------------------------------------------------------------------------- # # Funções # # ----------------------------------------------------------------------------------------------- # # random: retorna um número aleatório function random() { result=$(cat /dev/urandom | tr -dc '1-9999' | fold -w 256 | head -n 1 | head --bytes 4) echo ${result} } # download_hdfs_file: faz o download do HDFS para um diretório local function download_hdfs_file() { hdfs dfs -copyToLocal hdfs://${hdfs_host}:${hdfs_port}/${1} . } # check_env: verifica se a determinado parâmetro está configurado function check_env() { if [[ -z ${1} ]]; then echo ${2} show_help exit 1 fi } # ----------------------------------------------------------------------------------------------- # # Parâmetros # # ----------------------------------------------------------------------------------------------- # submit=python while getopts :h?:H::P::U::e::p::d::j::a::s::n::r::R::x::X::F FLAG; do case "${FLAG}" in h) show_help exit 0 ;; e) hdfs_env_file=${OPTARG} ;; p) hdfs_py_file=${OPTARG} ;; d) hdfs_py_deps=${OPTARG} ;; H) hdfs_host=${OPTARG} ;; U) hdfs_user=${OPTARG} ;; P) hdfs_port=${OPTARG} ;; j) java_home=${OPTARG} ;; a) declare -a args=("${OPTARG}") ;; s) submit=${OPTARG} ;; F) force_delete=true ;; n) num_executors=${OPTARG} ;; x) executor_memory=${OPTARG} ;; X) executor_cores=${OPTARG} ;; r) driver_memory=${OPTARG} ;; R) driver_cores=${OPTARG} ;; \?) show_help exit 0 ;; esac done check_env "${hdfs_env_file}" "Parâmetro -e (environment file) inválido" check_env "${hdfs_py_file}" "Parâmetro -p (python file) inválido" workdir=run_$(random) export HADOOP_USER_NAME=${hdfs_user} if [[ ! -z ${JAVA_HOME} ]]; then java_home=${JAVA_HOME} fi echo "" echo "hdfs host : ${hdfs_host}" echo "hdfs port : ${hdfs_port}" echo "hdfs user : ${hdfs_user}" echo "env remote file : ${hdfs_env_file}" echo "python remote file: ${hdfs_py_file}" echo "python remote deps: ${hdfs_py_deps}" echo "python args : ${args}" echo "submit mode : ${submit}" echo "num executors : ${num_executors}" echo "executor memory : ${executor_memory}" echo "executor cores : ${executor_cores}" echo "driver memory : ${driver_memory}" echo "driver cores : ${driver_cores}" echo "workdir : ${PWD}/${workdir}" echo "java home : ${java_home}" echo "force delete : ${force_delete}" echo "" if [[ ${submit} != "python" && ${submit} != "spark" ]]; then echo "submit-mode inválido ${submit} utilize python ou spark" show_help exit 1 fi # ----------------------------------------------------------------------------------------------- # # Execução # # ----------------------------------------------------------------------------------------------- # if [[ ${submit} == python ]]; then mkdir -p ${workdir} cd ${workdir} download_hdfs_file ${hdfs_env_file} download_hdfs_file ${hdfs_py_file} env_local_file=$(basename -- "${hdfs_env_file}") py_local_file=$(basename -- "${hdfs_py_file}") env_name=$(echo "${env_local_file}" | sed s/\.tar\.gz//g) if [[ ! -z ${hdfs_py_deps} ]]; then echo "Setting up ${hdfs_py_deps}" download_hdfs_file ${hdfs_py_deps} deps_local_file=$(basename -- "${hdfs_py_deps}") x=$(unzip ${deps_local_file} >> /dev/null 2>&1) fi echo "" echo "env local file: ${env_local_file}" echo "py local file : ${py_local_file}" if [[ ! -z ${deps_local_file} ]]; then echo "py local deps : ${deps_local_file}" fi echo "env name : ${env_name}" echo "" mkdir ${env_name} tar xfz ${env_local_file} -C ${env_name} export JAVA_HOME=${java_home} source ${env_name}/bin/activate set -x # conda list python ${py_local_file} ${args} result=${?} set +x source ${env_name}/bin/deactivate cd .. if [[ ${result} -eq 0 || ${force_delete} == "true" ]]; then rm -rf ${workdir} fi else set -x # export PYSPARK_PYTHON=./environment/bin/python py_deps="" if [[ ! -z ${hdfs_py_deps} ]]; then py_deps="--py-files hdfs://${hdfs_host}:${hdfs_port}${hdfs_py_deps}" fi spark-submit \ --executor-cores ${executor_cores} \ --num-executors ${num_executors} \ --driver-cores ${driver_cores} \ --driver-memory ${driver_memory} \ --executor-memory ${executor_memory} \ --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python \ --master yarn \ --deploy-mode cluster \ --archives "hdfs://${hdfs_host}:${hdfs_port}${hdfs_env_file}#environment" \ ${py_deps} hdfs://${hdfs_host}:${hdfs_port}${hdfs_py_file} ${args} set +x echo "" fi ``` O script também deve ser salvo no HDFS para que o Oozie tenha acesso. Para os próximos passos vamos considerar que ele está salvo no diretório `/scripts` com o nome `spark-submit.sh` ## 2.2 Execução no Oozie Agora vamos configurar tudo que fizemos até aqui no Oozie para finalmente executar o nosso job. O primeiro passo é criar um novo Workflow. ![Novo workflow](https://i.imgur.com/WHuEF0l.png) Criar um novo nó do tipo `shell`. ![Novo node](https://i.imgur.com/a0ZitAy.png) Editar o executor shell. ![Editar o node](https://i.imgur.com/D9qjFjd.png) Dentro do editor vamos selecionar no campo **exec*** o script que está salvo em `/scripts/spark-submit.sh` no HDFS. E passar os parâmetros necessários para executar o nosso job. ![Propriedades do shell](https://i.imgur.com/Ifteiy8.png) Outro ponto importante é que o script deve ser configurado na tab "Advanced Properties" no campo **File**. ![Advanced Properties - Script](https://i.imgur.com/oWw0OFM.png) Após salvar, vamos dar um nome ao nosso workflow. ![Nome do workflow](https://i.imgur.com/xfI1Psw.png) O último passo é realizar o submit, clique no botão `Submit` no canto superior direito. Selecione a opção "Run on submit" e desmarque a opção "Rerun on failure". Marque a opção "Overwrite" caso esteja fazendo uma atualização no seu workflow. Observe na imagem abaixo que estamos salvando o arquivo `workflow.xml` com a definição do nosso workflow junto dos arquivos do nosso job. ![Oozie submit](https://i.imgur.com/jH5nvs0.png) Pronto, nosso job será executado no Yarn. ## 3. Yarn Para verificar se a execução do job no Yarn devemos acessar o "Yarn Resource Manager UI", você pode acessá-lo através do link na interface do Ambari. ![Yarn - Resource Manager UI Link](https://i.imgur.com/qs0krRe.png) Na aba "Applications" expanda o *Application Name* para visualizar o seu job. Clique no link da coluna *Application ID* para mais detalhes. ![Applications list](https://i.imgur.com/2I6Z59e.png) **Caso receba um erro ao abrir a interface do Yarn, veja os passos da próxima seção.** Na tela de detalhes, você deve primeiro selecionar o campo de tentativas (*attempts*), na maioria dos casos terá apenas uma entrada pois a execução funcionou na primeira tentativa, por padrão o Yarn tenta três vezes. Depois deve escolher o container logs, como a execução é distribuída, geralmente os logs ficam divididos, selecione a última opção pois geralmente é onde está a saída do seu job. No exemplo abaixo selecionamos o segundo container (*container_e01_1596735718364_0031_01_000002*). O log é segmentado em vários tipos: directory.info, launch_container, stderr, stdout, syslog. Os mais importantes são o stderr e stdout, no exemplo abaixo selecionamos o stdout. ![yarn log](https://i.imgur.com/93fNccq.png) Ao clicar no link **here** podemos ver o log completo, ao rolar a tela para o final pode visualizar a saída do nosso job. Observe que temos o indicativo que deu tudo certo com o *exit code* igual a zero. ![yarn stdout log](https://i.imgur.com/OuEnobf.png) ## 4. Yarn com Kerberos ### 4.1 Negociação de chaves Os principais browser possuem um mecanismo para autenticar com o Kerberos utilizando a negociação de chaves. Para detalhes sobre cada navegador, acesse esse [link](https://active-directory-wp.com/docs/Networking/Single_Sign_On/Configure_browsers_to_use_Kerberos.html). Entretanto, caso o seu usuário do Windows não seja autenticado pelo mesmo Active Directory configurado no Yarn, a negociação da chave irá falhar, pois a chave do seu login não será válida no contexto do Yarn. ### 4.2 Acessando apenas o serviço de timeline Caso esteja na situação que não tem acesso ao `ResourceManager` você pode optar por acessar diretamente o serviço de timeline para visualização dos logs. O primeiro passo é desabilitar a autenticação do kerberos para esse serviço. ![yarn-timeline-simple-auth](https://i.imgur.com/Fspni4Z.png) O segundo passo é reiniciar todo o ambiente, porque pode ser que os serviços fiquem em estado inconsistente. Agora precisamos descobrir qual é o endereço do timeline. ![yarn-timeline-address](https://i.imgur.com/eIa4j1v.png) Acessando o endereço obtido acima, você consegue visualizar a lista de aplicações. ![yarn-timeline-page](https://i.imgur.com/u2jGOyl.png) Selecionando o link da imagem acima, podemos chegar nos logs. ![yarn-timeline-logs](https://i.imgur.com/VRYgxDj.png) Entrando na página do log, geralmente o último é do tipo `stdout`, veja abaixo um exemplo. ![yarn-timeline-log-stdout](https://i.imgur.com/1sVU7ns.png)