# Argo workflow 分享 * by 季鹏飞 (pengfei.ji@galixir.com) --- # 测试环境搭建 - 分发kubeconfig,覆盖 `$HOME/.kube/config` - 资源:https://github.com/jibuji/argo-share-resource --- ## 安装 kubectl CLI - brew install kubectl --- ## 安装 argo CLI ```bash # Download the binary curl -sLO https://github.com/argoproj/argo/releases/download/v3.1.0-rc10/argo-darwin-amd64.gz # Unzip gunzip argo-darwin-amd64.gz # Make binary executable chmod +x argo-darwin-amd64 # Move binary to path mv ./argo-darwin-amd64 /usr/local/bin/argo # Test installation argo version ``` --- # 参考资源 - 官方文档 https://argoproj.github.io/argo-workflows/ - 各种例子 https://github.com/argoproj/argo-workflows/tree/master/examples - 边学边练 https://www.katacoda.com/argoproj/courses/argo-workflows/ --- # workflow 核心结构 --- The core structure of a Workflow spec is a list of templates and an entrypoint. --- Templates can be loosely thought of as "functions": they define instructions to be executed. The entrypoint field defines what the "main" function will be – that is, the template that will be executed first. --- ## 例子: hello world ```bash argo submit hello-world.yaml -n argo-test ``` --- ## 删除测试的workfow ``` argo delete @latest -n argo-test argo delete -l test=true -n argo-test argo delete --all -n argo-test ``` --- ## Template Types - Template Definitions: - CONTAINER - SCRIPT - RESOURCE - SUSPEND - Template Invocators: - STEPS - DAG --- ### CONTAINER - hello-world.yaml ```yaml - name: whalesay container: image: docker/whalesay command: [cowsay] args: ["hello world"] ``` --- ### SCRIPT ```yaml - name: gen-random-int script: image: python:alpine3.6 command: [python] source: | import random i = random.randint(1, 100) print(i) ``` --- ### RESOURCE ```yaml - name: k8s-owner-reference resource: action: create manifest: | apiVersion: v1 kind: ConfigMap metadata: generateName: owned-eg- data: some: value ``` --- ### SUSPEND ```yaml - name: delay suspend: duration: "20s" ``` --- ### STEP - step.yaml ```yaml= - name: hello-hello-hello steps: - - name: hello1 template: whalesay arguments: parameters: [{name: message, value: "hello1"}] - - name: hello2a template: whalesay arguments: parameters: [{name: message, value: "hello2a"}] - name: hello2b template: whalesay arguments: parameters: [{name: message, value: "hello2b"}] ``` --- ### DAG ``` # # A # / \ # B C # \ / # D ``` --- - dag.yaml ```yaml= - name: dag dag: tasks: - name: A template: echo arguments: parameters: [{name: message, value: A}] - name: B dependencies: [A] template: echo arguments: parameters: [{name: message, value: B}] - name: C dependencies: [A] template: echo arguments: parameters: [{name: message, value: C}] - name: D dependencies: [B, C] template: echo arguments: parameters: [{name: message, value: D}] ``` --- # Study By Example --- ## Workflow Variables The tag is substituted with the variable that has a name the same as the tag. Simple tags `may` have whitespace between the brackets and variable. ```yaml= args: [ "{{ inputs.parameters.message }}" ] ``` --- ### example: workflow-variable.yaml ```bash argo submit workflow-variable.yaml -n argo-test argo submit workflow-variable.yaml -p message='hello galixir' -n argo-test ``` --- ### example: exit-code-output-variable.yaml ```yaml= steps: - - name: failing-container template: failing-container continueOn: failed: true - - name: echo-container template: echo-container arguments: parameters: - name: exitCode value: "{{steps.failing-container.exitCode}}" ``` --- ### variable referrence https://argoproj.github.io/argo-workflows/variables/#reference --- ## Empty Dir - You should use `emptyDir`, if you are using The Kubelet executors and the K8SAPI executors ```yaml= templates: - name: main container: image: registry-vpc.cn-beijing.aliyuncs.com/galixir/whalesay:latest command: [sh, -c] args: ["cowsay hello world | tee /mnt/out/hello_world.txt"] volumeMounts: - name: out mountPath: /mnt/out volumes: - name: out emptyDir: { } ``` --- ## loops.yaml - withItems: expands a step into multiple parallel steps from the items in the list - withParam: expands a step into multiple parallel steps from the value in the parameter, which is expected to be a JSON list - withSequence: expands a step into a numeric sequence --- ```yaml= - name: B dependencies: [A] template: whalesay arguments: parameters: - {name: message, value: "{{item}}"} withItems: - foo - bar - baz ``` `loops-dag.yaml` --- ## recursion.yaml ```bash= coinflip-recursive.yaml ``` ```yaml= templates: - name: coinflip steps: - - name: flip-coin template: flip-coin - - name: heads template: heads when: "{{steps.flip-coin.outputs.result}} == heads" - name: tails template: coinflip when: "{{steps.flip-coin.outputs.result}} == tails" - name: flip-coin script: image: registry.cn-beijing.aliyuncs.com/galixir/python:latest command: [python] source: | import random result = "heads" if random.randint(0,1) == 0 else "tails" print(result) ``` --- ## timeout.yaml ```bash= dag-task-level-timeout.yaml ``` ```yaml= - name: echo timeout: "{{inputs.parameters.timeout}}" inputs: parameters: - name: timeout container: image: registry-vpc.cn-beijing.aliyuncs.com/galixir/whalesay:latest command: [sleep, "15s"] ``` --- ## suspend - use `argo suspend` ```bash= argo suspend WORKFLOW_NAME -n argo-test ``` - use suspend template ```yaml= - name: approve suspend: {} - name: delay suspend: duration: "60" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d" ``` - resume ```bash= argo resume WORKFLOW_NAME -n argo-test ``` --- ## Workflow Templates * WorkflowTemplate is a definition of a Workflow that live in your cluster. * WorkflowTemplates also contains templates. --- * Templates in WorkflowTemplates can be referenced from within the WorkflowTemplate and from other Workflows and WorkflowTemplates on your cluster. * Any valid Workflow can be converted it to a WorkflowTemplate by substituting `kind: Workflow` to `kind: WorkflowTemplate`. --- ### workflow template example ```yaml= apiVersion: argoproj.io/v1alpha1 kind: WorkflowTemplate metadata: name: workflow-template-random-fail-template spec: templates: - name: random-fail-template retryStrategy: limit: 10 container: image: python:alpine3.6 command: [python, -c] # fail with a 66% probability args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"] ``` --- ```bash= argo template create templates.yaml -n argo-test argo submit dag-use-template.yaml -n argo-test ``` --- ## Workflow Inputs --- ### Parameter Inputs A workflow provides arguments, which are passed in to the entry point template ```yaml arguments: parameters: - name: workflow-param-1 ``` --- A template defines inputs which are then provided by template callers (such as steps, dag, or even a workflow) ```yaml= - name: step-template-A inputs: parameters: - name: template-param-1 ``` --- Inputs to DAGTemplates use the arguments format ```yaml= dag: tasks: - name: step-A template: step-template-A arguments: parameters: - name: template-param-1 value: abcd ``` --- ### example: parameter-inputs.yaml ```bash= argo submit parameter-inputs.yaml -n argo-test -p workflow-param-1=value-1 ``` --- ### example: previous-step-outputs-as-inputs - How to take the output of one step and send it as the input to another step? --- Suppose our `whalesay` template defines some outputs: ```yaml= - name: whalesay outputs: parameters: - name: hello-param valueFrom: default: "Foobar" # Default value to use if retrieving valueFrom fails. If not provided workflow will fail instead path: /tmp/hello_world.txt artifacts: - name: output-artifact-1 path: /tmp/some-directory ``` --- In my STEPTemplate, I can send these outputs to another template like this: ```yaml= - name: outputs steps: - - name: generate-output template: whalesay - - name: consume-output template: print-message arguments: parameters: - name: message value: "{{steps.generate-output.outputs.parameters.hello-param}}" artifacts: - name: input-artifact-1 from: "{{steps.generate-output.outputs.artifacts.output-artifact-1}}" ``` --- ```bash= argo submit previous-step-outputs-as-inputs.yaml -n argo-test ``` --- ## cron workflow - CronWorkflow = Workflow + some specific cron options ```yaml= apiVersion: argoproj.io/v1alpha1 kind: CronWorkflow metadata: name: hello-cron spec: schedule: "* * * * *" timezone: "America/Los_Angeles" # Default to local machine timezone startingDeadlineSeconds: 0 concurrencyPolicy: "Replace" # Default to "Allow" successfulJobsHistoryLimit: 4 # Default 3 failedJobsHistoryLimit: 4 # Default 1 suspend: false # Set to "true" to suspend scheduling workflowSpec: entrypoint: whalesay templates: - name: whalesay container: image: docker/whalesay:latest command: [cowsay] args: ["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"] ``` --- - CRON Expression Format A cron expression represents a set of times, using 5 space-separated fields. Field name | Mandatory? | Allowed values | Allowed special characters ---------- | ---------- | -------------- | -------------------------- Minutes | Yes | 0-59 | * / , - Hours | Yes | 0-23 | * / , - Day of month | Yes | 1-31 | * / , - ? Month | Yes | 1-12 or JAN-DEC | * / , - Day of week | Yes | 0-6 or SUN-SAT | * / , - ? * more detail: https://pkg.go.dev/github.com/robfig/cron/v3@v3.0.1#hdr-CRON_Expression_Format --- ```bash= argo cron create cron-workflow.yaml -n argo-test argo cron list -n argo-test argo cron delete hello-world -n argo-test ``` --- ## work avoidance * 对于比较大的任务,当再次执行时,如果上次执行成功了,就不再重复执行了 * argo没有提供新的特性支持这种能力 ```yaml= script: image: registry-vpc.cn-beijing.aliyuncs.com/galixir/whalesay:latest command: - bash - -eux source: | marker=/work/markers/$(date +%Y-%m-%d)-echo-{{inputs.parameters.num}} if [ -e ${marker} ]; then echo "work already done" exit 0 fi echo "working very hard" echo "exist" > ${marker} cat ${marker} ``` ```bash= argo submit work-avoidance.yaml -n argo-test ``` --- ## enhanced-depends-logic.yaml - A task may only be relevant to run if the dependent task succeeded (or failed, etc.). ```yaml= dag: tasks: - name: A template: pass - name: B depends: A template: pass - name: C depends: A template: fail - name: should-execute-1 depends: "A && (C.Succeeded || C.Failed)" # For more information about this depends field, see: docs/enhanced-depends-logic.md template: pass - name: should-execute-2 depends: B || C template: pass - name: should-not-execute depends: B && C template: pass - name: should-execute-3 depends: should-execute-2.Succeeded || should-not-execute template: pass ``` --- ## artifact-repository.yaml ```yaml= data: oss-v1: | oss: endpoint: oss-cn-beijing-internal.aliyuncs.com bucket: pyxir-drug-discovery-platform key: test-argo-1/ # this is path in the bucket # accessKeySecret and secretKeySecret are secret selectors. # It references the k8s secret named 'my-oss-credentials'. # This secret is expected to have have the keys 'accessKey' # and 'secretKey', containing the base64 encoded credentials # to the bucket. accessKeySecret: name: argo-oss-credentials key: accessKey secretKeySecret: name: argo-oss-credentials key: secretKey ``` ```bash= kubectl create secret generic argo-oss-credentials --from-literal=accessKey=LtAI5tCfBvs3PLJ7rH2WgRCs --from-literal=secretKey='4gGnMAFFqVXJ0m9I1oBTJ7ESDSwsGn' -n argo-test ``` --- ## key-only-artifact.yaml ```yaml= - name: generate container: image: argoproj/argosay:v2 args: [ echo, hello, /mnt/file ] outputs: artifacts: - name: file path: /mnt/file s3: key: my-file - name: consume container: image: argoproj/argosay:v2 args: [cat, /tmp/file] inputs: artifacts: - name: file path: /tmp/file s3: key: my-file ``` --- ```bash= argo submit key-only-artifact.yaml -n argo-test ``` --- ## synchronize - synchronize level - workflow level - template level - synchronize type - semaphore - mutex --- ### synchronize config map ```yaml apiVersion: v1 kind: ConfigMap metadata: name: sync-limit data: template: "2" workflow: "1" ``` --- ### workflow level + semaphore - semaphore.configMapKeyRef.name 相同的workflow或者template共享同一信号量 --- ```yaml= spec: entrypoint: main synchronization: semaphore: configMapKeyRef: name: sync-limit key: workflow ``` --- ### template level + mutex - Mutex lock limits only one of the template execution across the workflows in the namespace which has same Mutex lock - mutex.name相同的workflow或者template共享同一信号量 --- ```yaml= - name: acquire-lock synchronization: mutex: name: any-mutex-name container: image: registry-vpc.cn-beijing.aliyuncs.com/galixir/whalesay:latest command: [sh, -c] args: ["sleep 10; echo acquired lock"] ``` --- ### workflow synchronize example ```bash= argo submit synchronize-template-level.yaml -n argo-test ``` --- ## workflow-notifcation.yaml ```yaml - name: exit-handler steps: - - name: notify template: send-email - name: celebrate template: celebrate when: "{{workflow.status}} == Succeeded" - name: cry template: cry when: "{{workflow.status}} != Succeeded" ``` ```bash= argo submit share/examples/workflow-notification.yaml -n argo-test ``` --- ## workflow-events.yaml ```bash= kubectl get events --sort-by=.metadata.creationTimestamp -n argo-test ``` - Workflow state change: ``` WorkflowRunning WorkflowSucceeded WorkflowFailed WorkflowTimedOut ``` - Node state change: ``` WorkflowNodeRunning WorkflowNodeSucceeded WorkflowNodeFailed WorkflowNodeError ``` --- # Automation --- ## trigger workflow from outside 比如:当往git repo上push代码时,触发某个workflow - AccessToken - https://argoproj.github.io/argo-workflows/access-token/ - Event - https://argoproj.github.io/argo-workflows/events/ - Webhooks - https://argoproj.github.io/argo-workflows/webhooks/ --- ## asynchronous-job-pattern.yaml - Want to trigger an external job from Argo ```yaml= - name: trigger-job inputs: parameters: - name: "job-cmd" value: "{{inputs.parameters.job-cmd}}" image: appropriate/curl:latest command: ["/bin/sh", "-c"] args: ["{{inputs.parameters.cmd}}"] - name: wait-completion inputs: parameters: - name: uuid suspend: {} ``` --- ## Any Questions? --- ## Thank you!
{"metaMigratedAt":"2023-06-14T14:05:08.414Z","metaMigratedFrom":"YAML","title":"Argo workflow 知识分享","breaks":true,"description":"argo workflow knowledge","contributors":"[{\"id\":\"68f063c3-6333-4dad-bae5-28a5a5e892c3\",\"add\":20692,\"del\":4155}]"}
    1335 views