# Do Jupyter ao Oozie
###### tags: `jupyterhub` `oozie`
Nesse guia vamos realizar os passos necessários para transformar o seu notebook feito no JupyterHub em um job que pode ser agendado para execução no Oozie.
Vamos focar em um job Spark que será submetido no Yarn, embora não seja obrigatório que seu job tenha essas características.
## 1. JupyterHub
Abaixo temos os passos que são executados do lado do JupyterHub para preparar o seu código para ser executado no Oozie.
### 1.1 Notebook
Vamos utilizar o notebook abaixo como exemplo, com ele vamos conseguir exemplificar todas as particularidades do processo.

Abaixo está o código da figura do notebook acima.
```python=
from pyspark.sql import SparkSession
import numpy as np
import util
spark = SparkSession.builder.master('local[*]').getOrCreate()
arr = np.array([[2,3], [2,8], [2,3],[4,5]])
df = util.np_to_df(spark, arr)
df.show()
df.write.mode('overwrite').parquet('data-test')
```
Temos também um arquivo util.py a parte.
```python=
def np_to_df(spark, arr):
sc = spark.sparkContext
rdd1 = sc.parallelize(arr)
rdd2 = rdd1.map(lambda x: [int(i) for i in x])
df = rdd2.toDF(["A", "B"])
return df
```
### 1.2 Notebook para Python
O próprio Jupyter possui um utilitário que transforma um arquivo notebook (`ipynb`) em arquivo python (`.py`).
Considerando que o notebook tenha o nome `meu-job.ipynb` o comando abaixo transforma o arquivo em `.py`.
```bash=
jupyter nbconvert meu-job.ipynb --to python
[NbConvertApp] Converting notebook meu-job.ipynb to python
[NbConvertApp] Writing 379 bytes to meu-job.py
```
O arquivo convertido fica dessa maneira.
```python=
#!/usr/bin/env python
# coding: utf-8
# In[1]:
from pyspark.sql import SparkSession
# In[2]:
import numpy as np
import util
# In[3]:
spark = SparkSession.builder.master('local[*]').getOrCreate()
# In[4]:
arr = np.array([[2,3], [2,8], [2,3],[4,5]])
# In[5]:
df = util.np_to_df(spark, arr)
df.show()
# In[6]:
df.write.mode('overwrite').parquet('data-test')
```
### 1.3 Dependências Python
Os outros arquivos `.py` necessários para que o notebook funcione são compactados em formato zip e disponibilizados passados durante o processo de submit ao Yarn, por enquanto vamos apenas compactar essas dependências.
```bash=
# No comando abaixo nós compactamos todos os arquivos .py, exceto o main
$ zip -x meu-job.py -r meu-job-deps.zip *.py
adding: util.py (deflated 22%)
```
### 1.4 Bibliotecas de terceiros
As bibliotecas de terceiros, como o pyspark, pandas e numpy são disponibilizadas no job através do `conda`, para isso nos comandos abaixo vamos criar um environment específico para o job, instalar as dependências e empacotar o resultado em um arquivo `.tar.gz`.
```bash=
# 1) Criamos o environment do conda
$ conda create --name meu-job
Collecting package metadata (current_repodata.json): done
Solving environment: done
## Package Plan ##
environment location: /home/jovyan/.condaenvs/meu-job
Proceed ([y]/n)? y
Preparing transaction: done
Verifying transaction: done
Executing transaction: done
#
# To activate this environment, use
#
# $ conda activate meu-job
#
# To deactivate an active environment, use
#
# $ conda deactivate
```
```bash=
# 2) Iniciar o conda
$ conda init bash
no change /opt/conda/condabin/conda
no change /opt/conda/bin/conda
no change /opt/conda/bin/conda-env
no change /opt/conda/bin/activate
no change /opt/conda/bin/deactivate
no change /opt/conda/etc/profile.d/conda.sh
no change /opt/conda/etc/fish/conf.d/conda.fish
no change /opt/conda/shell/condabin/Conda.psm1
no change /opt/conda/shell/condabin/conda-hook.ps1
no change /opt/conda/lib/python3.7/site-packages/xontrib/conda.xsh
no change /opt/conda/etc/profile.d/conda.csh
modified /home/jovyan/.bashrc
==> For changes to take effect, close and re-open your current shell. <==
```
```bash=
# 3) Carregar a configuração no bash
$ source ~/.bashrc
# 4) Selecionamos o environment
$ conda activate meu-job
```
```bash=
# 5) Instalamos as dependências com o conda
$ conda install --yes python=3.7 numpy conda-pack
## Package Plan ##
environment location: /home/jovyan/.condaenvs/meu-job
added / updated specs:
- conda-pack
- numpy
- python=3.7
The following packages will be downloaded:
package | build
---------------------------|-----------------
_libgcc_mutex-0.1 | conda_forge 3 KB conda-forge
_openmp_mutex-4.5 | 1_gnu 22 KB conda-forge
conda-pack-0.4.0 | py_0 25 KB conda-forge
ld_impl_linux-64-2.34 | hc38a660_9 612 KB conda-forge
libblas-3.8.0 | 17_openblas 11 KB conda-forge
libcblas-3.8.0 | 17_openblas 11 KB conda-forge
libffi-3.2.1 | he1b5a44_1007 47 KB conda-forge
liblapack-3.8.0 | 17_openblas 11 KB conda-forge
libopenblas-0.3.10 |pthreads_hb3c22a3_4 7.8 MB conda-forge
python_abi-3.7 | 1_cp37m 4 KB conda-forge
tk-8.6.10 | hed695b0_0 3.2 MB conda-forge
wheel-0.34.2 | py_1 24 KB conda-forge
zlib-1.2.11 | h516909a_1006 105 KB conda-forge
------------------------------------------------------------
Total: 11.9 MB
```
```bash=
# 6) Também podemos instalar dependências com o pip
# Utilizamos o pyspark 2.3.2, pois é a versão configurada no ambiente
$ pip install pyspark==2.3.2
Processing ./.cache/pip/wheels/bd/3f/bb/cee54d865446970f330979b926919bfb33db0782e636e8e7e7/pyspark-2.3.2-py2.py3-none-any.whl
Collecting py4j==0.10.7
Using cached py4j-0.10.7-py2.py3-none-any.whl (197 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.3.2
```
```bash=
# 7) Listamos as dependências instaladas
$ conda list
# packages in environment at /home/jovyan/.condaenvs/meu-job:
#
# Name Version Build Channel
_libgcc_mutex 0.1 conda_forge conda-forge
_openmp_mutex 4.5 1_gnu conda-forge
ca-certificates 2020.6.20 hecda079_0 conda-forge
certifi 2020.6.20 py37hc8dfbb8_0 conda-forge
conda-pack 0.4.0 py_0 conda-forge
ld_impl_linux-64 2.34 hc38a660_9 conda-forge
libblas 3.8.0 17_openblas conda-forge
libcblas 3.8.0 17_openblas conda-forge
libffi 3.2.1 he1b5a44_1007 conda-forge
libgcc-ng 9.3.0 h24d8f2e_14 conda-forge
libgfortran-ng 7.5.0 hdf63c60_14 conda-forge
libgomp 9.3.0 h24d8f2e_14 conda-forge
liblapack 3.8.0 17_openblas conda-forge
libopenblas 0.3.10 pthreads_hb3c22a3_4 conda-forge
libstdcxx-ng 9.3.0 hdf63c60_14 conda-forge
ncurses 6.2 he1b5a44_1 conda-forge
numpy 1.19.1 py37h8960a57_0 conda-forge
openssl 1.1.1g h516909a_1 conda-forge
pip 20.2.1 py_0 conda-forge
py4j 0.10.7 pypi_0 pypi
pyspark 2.3.2 pypi_0 pypi
python 3.7.8 h6f2ec95_1_cpython conda-forge
python_abi 3.7 1_cp37m conda-forge
readline 8.0 he28a2e2_2 conda-forge
setuptools 49.2.1 py37hc8dfbb8_0 conda-forge
sqlite 3.32.3 hcee41ef_1 conda-forge
tk 8.6.10 hed695b0_0 conda-forge
wheel 0.34.2 py_1 conda-forge
xz 5.2.5 h516909a_1 conda-forge
zlib 1.2.11 h516909a_1006 conda-forge
```
```bash=
# 8) Empacotamos tudo, será criado um arquivo meu-job.tar.gz
$ conda pack -n meu-job
Collecting packages...
Packing environment at '/home/jovyan/.condaenvs/meu-job' to 'meu-job.tar.gz'
[########################################] | 100% Completed | 20.3s
```
### 1.5 Copiar arquivos para o HDFS
Até o momento nós temos três artefatos:
1) Arquivo meu-job.py que foi gerado a partir do notebook;
2) Arquivo meu-job-deps.zip que foi gerado compactandos os outros arquivos python;
3) Arquivo meu-job.tar.gz que foi gerado com o `conda pack` compactando as dependências
Todos esses arquivos serão acessados pelo Oozie através do HDFS, portanto o comando abaixo mostra como criar um diretório e copiar os arquivos para o HDFS via linha de comando.
```bash
# Criação de um diretório para armazenar os arquivos
$ hdfs dfs -mkdir -p /jobs/meu-job
# Copiar os arquivos locais para o HDFS
$ hdfs dfs -copyFromLocal \
meu-job.py meu-job-deps.zip meu-job.tar.gz \
/jobs/meu-job
# Verifique se os arquivos estão todos lá
$ hdfs dfs -ls -h /jobs/meu-job/
Found 3 items
-rw-r--r-- 3 admin hdfs 297 2020-08-26 09:44 /jobs/meu-job/meu-job-deps.zip
-rw-r--r-- 3 admin hdfs 444 2020-08-26 09:44 /jobs/meu-job/meu-job.py
-rw-r--r-- 3 admin hdfs 303.2 M 2020-08-26 09:44 /jobs/meu-job/meu-job.tar.gz
```
## 2. Oozie
O Apache Oozie é responsável por definir workflows de processamento e executá-los de acordo com o agendamento realizado.
Nesse guia vamos utilizá-lo para submeter o nosso job no Yarn.
Embora o próprio Oozie já tenha uma opção direta para definir um workflow Spark, ele é um pouco limitado pois não nos deixa determinar o environment corretamente. Para contornar essa limitação, vamos utilizar um script bash que configura todos os artefatos e faz o submit no Yarn.
### 2.1 Submit script
O script abaixo é bem simples e contém vários comentários para ajudar a entendê-lo.
```bash
#!/bin/bash
hdfs_host=127.0.0.1
hdfs_port=8020
hdfs_user=admin
java_home="/usr/jdk64/jdk1.8.0_112"
force_delete=false
num_executors=2
driver_memory=1G
driver_cores=4
executor_memory=1G
executor_cores=4
# ----------------------------------------------------------------------------------------------- #
# Help #
# ----------------------------------------------------------------------------------------------- #
function show_help() {
echo ""
echo "Usage: : ${0} -e <path> -p <path> -d [<dependências python em formato zip>] -a [argumentos do script python separados por espaço]"
echo ""
echo "Parâmetros:"
echo " -e <env tar.gz file> - Caminho completo do arquivo environment no hdfs"
echo " -p <python file> - Caminho completo do arquivo python no hdfs"
echo " -d <python deps> - Caminho completo do arquivo zip no hdfs"
echo " -a [script args] - Argumentos do script python"
echo " -H [host do hdfs] - Host do HDFS (default: ${hdfs_host})"
echo " -P [porta do hdfs] - Porta do HDFS (default: ${hdfs_port})"
echo " -U [user do hdfs] - Usuário do HDFS (default: ${hdfs_user})"
echo " -j [java home] - Java home (default: ${java_home})"
echo " -s [submit mode] - Modo de submit: python (default) ou spark"
echo " -x [executor memory] - Configuração do executor-memory do spark (default: ${executor_memory})"
echo " -X [executor cores] - Configuração do executor-cores do spark (default: ${executor_cores})"
echo " -r [driver memory] - Configuração do driver-memory do spark (default: ${driver_memory})"
echo " -R [driver cores] - Configuração do driver-cores do spark (default: ${driver_cores})"
echo " -n [num executors] - Configuração do num-executors do spark (default: ${num_executors})"
echo " -F - Força a deleção do diretório run mesmo em caso de erro (default: false)"
echo ""
}
# ----------------------------------------------------------------------------------------------- #
# Funções #
# ----------------------------------------------------------------------------------------------- #
# random: retorna um número aleatório
function random() {
result=$(cat /dev/urandom | tr -dc '1-9999' | fold -w 256 | head -n 1 | head --bytes 4)
echo ${result}
}
# download_hdfs_file: faz o download do HDFS para um diretório local
function download_hdfs_file() {
hdfs dfs -copyToLocal hdfs://${hdfs_host}:${hdfs_port}/${1} .
}
# check_env: verifica se a determinado parâmetro está configurado
function check_env() {
if [[ -z ${1} ]]; then
echo ${2}
show_help
exit 1
fi
}
# ----------------------------------------------------------------------------------------------- #
# Parâmetros #
# ----------------------------------------------------------------------------------------------- #
submit=python
while getopts :h?:H::P::U::e::p::d::j::a::s::n::r::R::x::X::F FLAG; do
case "${FLAG}" in
h)
show_help
exit 0
;;
e)
hdfs_env_file=${OPTARG}
;;
p)
hdfs_py_file=${OPTARG}
;;
d)
hdfs_py_deps=${OPTARG}
;;
H)
hdfs_host=${OPTARG}
;;
U)
hdfs_user=${OPTARG}
;;
P)
hdfs_port=${OPTARG}
;;
j)
java_home=${OPTARG}
;;
a)
declare -a args=("${OPTARG}")
;;
s)
submit=${OPTARG}
;;
F)
force_delete=true
;;
n)
num_executors=${OPTARG}
;;
x)
executor_memory=${OPTARG}
;;
X)
executor_cores=${OPTARG}
;;
r)
driver_memory=${OPTARG}
;;
R)
driver_cores=${OPTARG}
;;
\?)
show_help
exit 0
;;
esac
done
check_env "${hdfs_env_file}" "Parâmetro -e (environment file) inválido"
check_env "${hdfs_py_file}" "Parâmetro -p (python file) inválido"
workdir=run_$(random)
export HADOOP_USER_NAME=${hdfs_user}
if [[ ! -z ${JAVA_HOME} ]]; then
java_home=${JAVA_HOME}
fi
echo ""
echo "hdfs host : ${hdfs_host}"
echo "hdfs port : ${hdfs_port}"
echo "hdfs user : ${hdfs_user}"
echo "env remote file : ${hdfs_env_file}"
echo "python remote file: ${hdfs_py_file}"
echo "python remote deps: ${hdfs_py_deps}"
echo "python args : ${args}"
echo "submit mode : ${submit}"
echo "num executors : ${num_executors}"
echo "executor memory : ${executor_memory}"
echo "executor cores : ${executor_cores}"
echo "driver memory : ${driver_memory}"
echo "driver cores : ${driver_cores}"
echo "workdir : ${PWD}/${workdir}"
echo "java home : ${java_home}"
echo "force delete : ${force_delete}"
echo ""
if [[ ${submit} != "python" && ${submit} != "spark" ]]; then
echo "submit-mode inválido ${submit} utilize python ou spark"
show_help
exit 1
fi
# ----------------------------------------------------------------------------------------------- #
# Execução #
# ----------------------------------------------------------------------------------------------- #
if [[ ${submit} == python ]]; then
mkdir -p ${workdir}
cd ${workdir}
download_hdfs_file ${hdfs_env_file}
download_hdfs_file ${hdfs_py_file}
env_local_file=$(basename -- "${hdfs_env_file}")
py_local_file=$(basename -- "${hdfs_py_file}")
env_name=$(echo "${env_local_file}" | sed s/\.tar\.gz//g)
if [[ ! -z ${hdfs_py_deps} ]]; then
echo "Setting up ${hdfs_py_deps}"
download_hdfs_file ${hdfs_py_deps}
deps_local_file=$(basename -- "${hdfs_py_deps}")
x=$(unzip ${deps_local_file} >> /dev/null 2>&1)
fi
echo ""
echo "env local file: ${env_local_file}"
echo "py local file : ${py_local_file}"
if [[ ! -z ${deps_local_file} ]]; then
echo "py local deps : ${deps_local_file}"
fi
echo "env name : ${env_name}"
echo ""
mkdir ${env_name}
tar xfz ${env_local_file} -C ${env_name}
export JAVA_HOME=${java_home}
source ${env_name}/bin/activate
set -x
# conda list
python ${py_local_file} ${args}
result=${?}
set +x
source ${env_name}/bin/deactivate
cd ..
if [[ ${result} -eq 0 || ${force_delete} == "true" ]]; then
rm -rf ${workdir}
fi
else
set -x
# export PYSPARK_PYTHON=./environment/bin/python
py_deps=""
if [[ ! -z ${hdfs_py_deps} ]]; then
py_deps="--py-files hdfs://${hdfs_host}:${hdfs_port}${hdfs_py_deps}"
fi
spark-submit \
--executor-cores ${executor_cores} \
--num-executors ${num_executors} \
--driver-cores ${driver_cores} \
--driver-memory ${driver_memory} \
--executor-memory ${executor_memory} \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python \
--master yarn \
--deploy-mode cluster \
--archives "hdfs://${hdfs_host}:${hdfs_port}${hdfs_env_file}#environment" \
${py_deps} hdfs://${hdfs_host}:${hdfs_port}${hdfs_py_file} ${args}
set +x
echo ""
fi
```
O script também deve ser salvo no HDFS para que o Oozie tenha acesso. Para os próximos passos vamos considerar que ele está salvo no diretório `/scripts` com o nome `spark-submit.sh`
## 2.2 Execução no Oozie
Agora vamos configurar tudo que fizemos até aqui no Oozie para finalmente executar o nosso job.
O primeiro passo é criar um novo Workflow.

Criar um novo nó do tipo `shell`.

Editar o executor shell.

Dentro do editor vamos selecionar no campo **exec*** o script que está salvo em `/scripts/spark-submit.sh` no HDFS.
E passar os parâmetros necessários para executar o nosso job.

Outro ponto importante é que o script deve ser configurado na tab "Advanced Properties" no campo **File**.

Após salvar, vamos dar um nome ao nosso workflow.

O último passo é realizar o submit, clique no botão `Submit` no canto superior direito.
Selecione a opção "Run on submit" e desmarque a opção "Rerun on failure". Marque a opção "Overwrite" caso esteja fazendo uma atualização no seu workflow.
Observe na imagem abaixo que estamos salvando o arquivo `workflow.xml` com a definição do nosso workflow junto dos arquivos do nosso job.

Pronto, nosso job será executado no Yarn.
## 3. Yarn
Para verificar se a execução do job no Yarn devemos acessar o "Yarn Resource Manager UI", você pode acessá-lo através do link na interface do Ambari.

Na aba "Applications" expanda o *Application Name* para visualizar o seu job. Clique no link da coluna *Application ID* para mais detalhes.

**Caso receba um erro ao abrir a interface do Yarn, veja os passos da próxima seção.**
Na tela de detalhes, você deve primeiro selecionar o campo de tentativas (*attempts*), na maioria dos casos terá apenas uma entrada pois a execução funcionou na primeira tentativa, por padrão o Yarn tenta três vezes.
Depois deve escolher o container logs, como a execução é distribuída, geralmente os logs ficam divididos, selecione a última opção pois geralmente é onde está a saída do seu job. No exemplo abaixo selecionamos o segundo container (*container_e01_1596735718364_0031_01_000002*).
O log é segmentado em vários tipos: directory.info, launch_container, stderr, stdout, syslog. Os mais importantes são o stderr e stdout, no exemplo abaixo selecionamos o stdout.

Ao clicar no link **here** podemos ver o log completo, ao rolar a tela para o final pode visualizar a saída do nosso job. Observe que temos o indicativo que deu tudo certo com o *exit code* igual a zero.

## 4. Yarn com Kerberos
### 4.1 Negociação de chaves
Os principais browser possuem um mecanismo para autenticar com o Kerberos utilizando a negociação de chaves. Para detalhes sobre cada navegador, acesse esse [link](https://active-directory-wp.com/docs/Networking/Single_Sign_On/Configure_browsers_to_use_Kerberos.html).
Entretanto, caso o seu usuário do Windows não seja autenticado pelo mesmo Active Directory configurado no Yarn, a negociação da chave irá falhar, pois a chave do seu login não será válida no contexto do Yarn.
### 4.2 Acessando apenas o serviço de timeline
Caso esteja na situação que não tem acesso ao `ResourceManager` você pode optar por acessar diretamente o serviço de timeline para visualização dos logs.
O primeiro passo é desabilitar a autenticação do kerberos para esse serviço.

O segundo passo é reiniciar todo o ambiente, porque pode ser que os serviços fiquem em estado inconsistente.
Agora precisamos descobrir qual é o endereço do timeline.

Acessando o endereço obtido acima, você consegue visualizar a lista de aplicações.

Selecionando o link da imagem acima, podemos chegar nos logs.

Entrando na página do log, geralmente o último é do tipo `stdout`, veja abaixo um exemplo.
