# Argo workflow 分享
* by 季鹏飞 (pengfei.ji@galixir.com)
---
# 测试环境搭建
- 分发kubeconfig,覆盖 `$HOME/.kube/config`
- 资源:https://github.com/jibuji/argo-share-resource
---
## 安装 kubectl CLI
- brew install kubectl
---
## 安装 argo CLI
```bash
# Download the binary
curl -sLO https://github.com/argoproj/argo/releases/download/v3.1.0-rc10/argo-darwin-amd64.gz
# Unzip
gunzip argo-darwin-amd64.gz
# Make binary executable
chmod +x argo-darwin-amd64
# Move binary to path
mv ./argo-darwin-amd64 /usr/local/bin/argo
# Test installation
argo version
```
---
# 参考资源
- 官方文档 https://argoproj.github.io/argo-workflows/
- 各种例子 https://github.com/argoproj/argo-workflows/tree/master/examples
- 边学边练 https://www.katacoda.com/argoproj/courses/argo-workflows/
---
# workflow 核心结构
---
The core structure of a Workflow spec is a list of templates and an entrypoint.
---
Templates can be loosely thought of as "functions": they define instructions to be executed. The entrypoint field defines what the "main" function will be – that is, the template that will be executed first.
---
## 例子: hello world
```bash
argo submit hello-world.yaml -n argo-test
```
---
## 删除测试的workfow
```
argo delete @latest -n argo-test
argo delete -l test=true -n argo-test
argo delete --all -n argo-test
```
---
## Template Types
- Template Definitions:
- CONTAINER
- SCRIPT
- RESOURCE
- SUSPEND
- Template Invocators:
- STEPS
- DAG
---
### CONTAINER
- hello-world.yaml
```yaml
- name: whalesay
container:
image: docker/whalesay
command: [cowsay]
args: ["hello world"]
```
---
### SCRIPT
```yaml
- name: gen-random-int
script:
image: python:alpine3.6
command: [python]
source: |
import random
i = random.randint(1, 100)
print(i)
```
---
### RESOURCE
```yaml
- name: k8s-owner-reference
resource:
action: create
manifest: |
apiVersion: v1
kind: ConfigMap
metadata:
generateName: owned-eg-
data:
some: value
```
---
### SUSPEND
```yaml
- name: delay
suspend:
duration: "20s"
```
---
### STEP
- step.yaml
```yaml=
- name: hello-hello-hello
steps:
- - name: hello1
template: whalesay
arguments:
parameters: [{name: message, value: "hello1"}]
- - name: hello2a
template: whalesay
arguments:
parameters: [{name: message, value: "hello2a"}]
- name: hello2b
template: whalesay
arguments:
parameters: [{name: message, value: "hello2b"}]
```
---
### DAG
```
#
# A
# / \
# B C
# \ /
# D
```
---
- dag.yaml
```yaml=
- name: dag
dag:
tasks:
- name: A
template: echo
arguments:
parameters: [{name: message, value: A}]
- name: B
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: B}]
- name: C
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: C}]
- name: D
dependencies: [B, C]
template: echo
arguments:
parameters: [{name: message, value: D}]
```
---
# Study By Example
---
## Workflow Variables
The tag is substituted with the variable that has a name the same as the tag.
Simple tags `may` have whitespace between the brackets and variable.
```yaml=
args: [ "{{ inputs.parameters.message }}" ]
```
---
### example: workflow-variable.yaml
```bash
argo submit workflow-variable.yaml -n argo-test
argo submit workflow-variable.yaml -p message='hello galixir' -n argo-test
```
---
### example: exit-code-output-variable.yaml
```yaml=
steps:
- - name: failing-container
template: failing-container
continueOn:
failed: true
- - name: echo-container
template: echo-container
arguments:
parameters:
- name: exitCode
value: "{{steps.failing-container.exitCode}}"
```
---
### variable referrence
https://argoproj.github.io/argo-workflows/variables/#reference
---
## Empty Dir
- You should use `emptyDir`, if you are using The Kubelet executors and the K8SAPI executors
```yaml=
templates:
- name: main
container:
image: registry-vpc.cn-beijing.aliyuncs.com/galixir/whalesay:latest
command: [sh, -c]
args: ["cowsay hello world | tee /mnt/out/hello_world.txt"]
volumeMounts:
- name: out
mountPath: /mnt/out
volumes:
- name: out
emptyDir: { }
```
---
## loops.yaml
- withItems: expands a step into multiple parallel steps from the items in the list
- withParam: expands a step into multiple parallel steps from the value in the parameter, which is expected to be a JSON list
- withSequence: expands a step into a numeric sequence
---
```yaml=
- name: B
dependencies: [A]
template: whalesay
arguments:
parameters:
- {name: message, value: "{{item}}"}
withItems:
- foo
- bar
- baz
```
`loops-dag.yaml`
---
## recursion.yaml
```bash=
coinflip-recursive.yaml
```
```yaml=
templates:
- name: coinflip
steps:
- - name: flip-coin
template: flip-coin
- - name: heads
template: heads
when: "{{steps.flip-coin.outputs.result}} == heads"
- name: tails
template: coinflip
when: "{{steps.flip-coin.outputs.result}} == tails"
- name: flip-coin
script:
image: registry.cn-beijing.aliyuncs.com/galixir/python:latest
command: [python]
source: |
import random
result = "heads" if random.randint(0,1) == 0 else "tails"
print(result)
```
---
## timeout.yaml
```bash=
dag-task-level-timeout.yaml
```
```yaml=
- name: echo
timeout: "{{inputs.parameters.timeout}}"
inputs:
parameters:
- name: timeout
container:
image: registry-vpc.cn-beijing.aliyuncs.com/galixir/whalesay:latest
command: [sleep, "15s"]
```
---
## suspend
- use `argo suspend`
```bash=
argo suspend WORKFLOW_NAME -n argo-test
```
- use suspend template
```yaml=
- name: approve
suspend: {}
- name: delay
suspend:
duration: "60" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
```
- resume
```bash=
argo resume WORKFLOW_NAME -n argo-test
```
---
## Workflow Templates
* WorkflowTemplate is a definition of a Workflow that live in your cluster.
* WorkflowTemplates also contains templates.
---
* Templates in WorkflowTemplates can be referenced from within the WorkflowTemplate and from other Workflows and WorkflowTemplates on your cluster.
* Any valid Workflow can be converted it to a WorkflowTemplate by substituting `kind: Workflow` to `kind: WorkflowTemplate`.
---
### workflow template example
```yaml=
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: workflow-template-random-fail-template
spec:
templates:
- name: random-fail-template
retryStrategy:
limit: 10
container:
image: python:alpine3.6
command: [python, -c]
# fail with a 66% probability
args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
```
---
```bash=
argo template create templates.yaml -n argo-test
argo submit dag-use-template.yaml -n argo-test
```
---
## Workflow Inputs
---
### Parameter Inputs
A workflow provides arguments, which are passed in to the entry point template
```yaml
arguments:
parameters:
- name: workflow-param-1
```
---
A template defines inputs which are then provided by template callers (such as steps, dag, or even a workflow)
```yaml=
- name: step-template-A
inputs:
parameters:
- name: template-param-1
```
---
Inputs to DAGTemplates use the arguments format
```yaml=
dag:
tasks:
- name: step-A
template: step-template-A
arguments:
parameters:
- name: template-param-1
value: abcd
```
---
### example: parameter-inputs.yaml
```bash=
argo submit parameter-inputs.yaml -n argo-test -p workflow-param-1=value-1
```
---
### example: previous-step-outputs-as-inputs
- How to take the output of one step and send it as the input to another step?
---
Suppose our `whalesay` template defines some outputs:
```yaml=
- name: whalesay
outputs:
parameters:
- name: hello-param
valueFrom:
default: "Foobar" # Default value to use if retrieving valueFrom fails. If not provided workflow will fail instead
path: /tmp/hello_world.txt
artifacts:
- name: output-artifact-1
path: /tmp/some-directory
```
---
In my STEPTemplate, I can send these outputs to another template like this:
```yaml=
- name: outputs
steps:
- - name: generate-output
template: whalesay
- - name: consume-output
template: print-message
arguments:
parameters:
- name: message
value: "{{steps.generate-output.outputs.parameters.hello-param}}"
artifacts:
- name: input-artifact-1
from: "{{steps.generate-output.outputs.artifacts.output-artifact-1}}"
```
---
```bash=
argo submit previous-step-outputs-as-inputs.yaml -n argo-test
```
---
## cron workflow
- CronWorkflow = Workflow + some specific cron options
```yaml=
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: hello-cron
spec:
schedule: "* * * * *"
timezone: "America/Los_Angeles" # Default to local machine timezone
startingDeadlineSeconds: 0
concurrencyPolicy: "Replace" # Default to "Allow"
successfulJobsHistoryLimit: 4 # Default 3
failedJobsHistoryLimit: 4 # Default 1
suspend: false # Set to "true" to suspend scheduling
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"]
```
---
- CRON Expression Format
A cron expression represents a set of times, using 5 space-separated fields.
Field name | Mandatory? | Allowed values | Allowed special characters
---------- | ---------- | -------------- | --------------------------
Minutes | Yes | 0-59 | * / , -
Hours | Yes | 0-23 | * / , -
Day of month | Yes | 1-31 | * / , - ?
Month | Yes | 1-12 or JAN-DEC | * / , -
Day of week | Yes | 0-6 or SUN-SAT | * / , - ?
* more detail:
https://pkg.go.dev/github.com/robfig/cron/v3@v3.0.1#hdr-CRON_Expression_Format
---
```bash=
argo cron create cron-workflow.yaml -n argo-test
argo cron list -n argo-test
argo cron delete hello-world -n argo-test
```
---
## work avoidance
* 对于比较大的任务,当再次执行时,如果上次执行成功了,就不再重复执行了
* argo没有提供新的特性支持这种能力
```yaml=
script:
image: registry-vpc.cn-beijing.aliyuncs.com/galixir/whalesay:latest
command:
- bash
- -eux
source: |
marker=/work/markers/$(date +%Y-%m-%d)-echo-{{inputs.parameters.num}}
if [ -e ${marker} ]; then
echo "work already done"
exit 0
fi
echo "working very hard"
echo "exist" > ${marker}
cat ${marker}
```
```bash=
argo submit work-avoidance.yaml -n argo-test
```
---
## enhanced-depends-logic.yaml
- A task may only be relevant to run if the dependent task succeeded (or failed, etc.).
```yaml=
dag:
tasks:
- name: A
template: pass
- name: B
depends: A
template: pass
- name: C
depends: A
template: fail
- name: should-execute-1
depends: "A && (C.Succeeded || C.Failed)" # For more information about this depends field, see: docs/enhanced-depends-logic.md
template: pass
- name: should-execute-2
depends: B || C
template: pass
- name: should-not-execute
depends: B && C
template: pass
- name: should-execute-3
depends: should-execute-2.Succeeded || should-not-execute
template: pass
```
---
## artifact-repository.yaml
```yaml=
data:
oss-v1: |
oss:
endpoint: oss-cn-beijing-internal.aliyuncs.com
bucket: pyxir-drug-discovery-platform
key: test-argo-1/ # this is path in the bucket
# accessKeySecret and secretKeySecret are secret selectors.
# It references the k8s secret named 'my-oss-credentials'.
# This secret is expected to have have the keys 'accessKey'
# and 'secretKey', containing the base64 encoded credentials
# to the bucket.
accessKeySecret:
name: argo-oss-credentials
key: accessKey
secretKeySecret:
name: argo-oss-credentials
key: secretKey
```
```bash=
kubectl create secret generic argo-oss-credentials --from-literal=accessKey=LtAI5tCfBvs3PLJ7rH2WgRCs --from-literal=secretKey='4gGnMAFFqVXJ0m9I1oBTJ7ESDSwsGn' -n argo-test
```
---
## key-only-artifact.yaml
```yaml=
- name: generate
container:
image: argoproj/argosay:v2
args: [ echo, hello, /mnt/file ]
outputs:
artifacts:
- name: file
path: /mnt/file
s3:
key: my-file
- name: consume
container:
image: argoproj/argosay:v2
args: [cat, /tmp/file]
inputs:
artifacts:
- name: file
path: /tmp/file
s3:
key: my-file
```
---
```bash=
argo submit key-only-artifact.yaml -n argo-test
```
---
## synchronize
- synchronize level
- workflow level
- template level
- synchronize type
- semaphore
- mutex
---
### synchronize config map
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: sync-limit
data:
template: "2"
workflow: "1"
```
---
### workflow level + semaphore
- semaphore.configMapKeyRef.name 相同的workflow或者template共享同一信号量
---
```yaml=
spec:
entrypoint: main
synchronization:
semaphore:
configMapKeyRef:
name: sync-limit
key: workflow
```
---
### template level + mutex
- Mutex lock limits only one of the template execution across the workflows in the namespace which has same Mutex lock
- mutex.name相同的workflow或者template共享同一信号量
---
```yaml=
- name: acquire-lock
synchronization:
mutex:
name: any-mutex-name
container:
image: registry-vpc.cn-beijing.aliyuncs.com/galixir/whalesay:latest
command: [sh, -c]
args: ["sleep 10; echo acquired lock"]
```
---
### workflow synchronize example
```bash=
argo submit synchronize-template-level.yaml -n argo-test
```
---
## workflow-notifcation.yaml
```yaml
- name: exit-handler
steps:
- - name: notify
template: send-email
- name: celebrate
template: celebrate
when: "{{workflow.status}} == Succeeded"
- name: cry
template: cry
when: "{{workflow.status}} != Succeeded"
```
```bash=
argo submit share/examples/workflow-notification.yaml -n argo-test
```
---
## workflow-events.yaml
```bash=
kubectl get events --sort-by=.metadata.creationTimestamp -n argo-test
```
- Workflow state change:
```
WorkflowRunning
WorkflowSucceeded
WorkflowFailed
WorkflowTimedOut
```
- Node state change:
```
WorkflowNodeRunning
WorkflowNodeSucceeded
WorkflowNodeFailed
WorkflowNodeError
```
---
# Automation
---
## trigger workflow from outside
比如:当往git repo上push代码时,触发某个workflow
- AccessToken
- https://argoproj.github.io/argo-workflows/access-token/
- Event
- https://argoproj.github.io/argo-workflows/events/
- Webhooks
- https://argoproj.github.io/argo-workflows/webhooks/
---
## asynchronous-job-pattern.yaml
- Want to trigger an external job from Argo
```yaml=
- name: trigger-job
inputs:
parameters:
- name: "job-cmd"
value: "{{inputs.parameters.job-cmd}}"
image: appropriate/curl:latest
command: ["/bin/sh", "-c"]
args: ["{{inputs.parameters.cmd}}"]
- name: wait-completion
inputs:
parameters:
- name: uuid
suspend: {}
```
---
## Any Questions?
---
## Thank you!
{"metaMigratedAt":"2023-06-14T14:05:08.414Z","metaMigratedFrom":"YAML","title":"Argo workflow 知识分享","breaks":true,"description":"argo workflow knowledge","contributors":"[{\"id\":\"68f063c3-6333-4dad-bae5-28a5a5e892c3\",\"add\":20692,\"del\":4155}]"}