owned this note
owned this note
Published
Linked with GitHub
# flytekit agent databricks discussion
There are 2 ways to create a cluster and execute task.
## first way: 4 step as below
The code will be like
1.Create a cluster
```python
clstr = w.clusters.create_and_wait(
cluster_name=custom["cluster_name"],
docker_image=docker_image,
spark_version=custom["spark_version"],
node_type_id=custom["node_type_id"],
autotermination_minutes=custom["autotermination_minutes"],
num_workers=custom["num_workers"],
)
```
```
2.Get the cluster id
```python
cluster_id = clstr.cluster_id # important metadata
```
3.define the task (use the cluster id from step 2)
```python
tasks = [
jobs.Task(
description=custom["description"],
existing_cluster_id=cluster_id,
spark_python_task=jobs.SparkPythonTask(python_file=custom["python_file"]),
task_key=custom["task_key"], # metadata
timeout_seconds=custom["timeout_seconds"], # metadata
)
]
```
4.run submit the task
```python
run = w.jobs.submit(
name=custom["cluster_name"], # metadata
tasks=tasks, # tasks
).result()
```
## second way:
The code will be like
1.define the task (use an argument called new_cluster)
```python
tasks = [
jobs.Task(
description=custom["description"],
spark_python_task=jobs.SparkPythonTask(python_file=custom["python_file"]),
task_key=custom["task_key"], # metadata
new_cluster=Optional['compute.ClusterSpec'] = None,
timeout_seconds=custom["timeout_seconds"], # metadata
)
]
```
2.run submit the task
```python
run = w.jobs.submit(
name=custom["cluster_name"], # metadata
tasks=tasks, # tasks
).result()
```
## Compare the pros and cons
We have to implement 3 main function in agent.py file for databricks.
They are create, get and delete.
For create function, the 2nd version will be more easily to implement.
For get function, both version only need host, token and job_id.
For delete functioin, the 1st functoin will be more easily to implement, since we have to delete the job by job_id and cluster by the cluster_id.
note: I don't know whether the cluster will be deleted if I use the 2nd version, because I have only job_id to delete the job.
## Problem I've faced
Since we use argument to pass to the python sdk function,
the solution is not as flexible as we use post api on databricks.
We have to add every arguments to the function, for example, the submit function above
```python
def submit(self,
*,
access_control_list: Optional[List[iam.AccessControlRequest]] = None,
email_notifications: Optional[JobEmailNotifications] = None,
git_source: Optional[GitSource] = None,
health: Optional[JobsHealthRules] = None,
idempotency_token: Optional[str] = None,
notification_settings: Optional[JobNotificationSettings] = None,
run_name: Optional[str] = None,
tasks: Optional[List[SubmitTask]] = None,
timeout_seconds: Optional[int] = None,
webhook_notifications: Optional[WebhookNotifications] = None) -> Wait[Run]:
```
Has 10+ arguments, and we have to implement it all.
and the Task will be
```python
@dataclass
class Task:
task_key: str
compute_key: Optional[str] = None
condition_task: Optional['ConditionTask'] = None
dbt_task: Optional['DbtTask'] = None
depends_on: Optional['List[TaskDependency]'] = None
description: Optional[str] = None
email_notifications: Optional['TaskEmailNotifications'] = None
existing_cluster_id: Optional[str] = None
health: Optional['JobsHealthRules'] = None
job_cluster_key: Optional[str] = None
libraries: Optional['List[compute.Library]'] = None
max_retries: Optional[int] = None
min_retry_interval_millis: Optional[int] = None
new_cluster: Optional['compute.ClusterSpec'] = None
notebook_task: Optional['NotebookTask'] = None
notification_settings: Optional['TaskNotificationSettings'] = None
pipeline_task: Optional['PipelineTask'] = None
python_wheel_task: Optional['PythonWheelTask'] = None
retry_on_timeout: Optional[bool] = None
run_if: Optional['RunIf'] = None
run_job_task: Optional['RunJobTask'] = None
spark_jar_task: Optional['SparkJarTask'] = None
spark_python_task: Optional['SparkPythonTask'] = None
spark_submit_task: Optional['SparkSubmitTask'] = None
sql_task: Optional['SqlTask'] = None
timeout_seconds: Optional[int] = None
```
there will be 20+ arguments to be implemented