# Solution - Scheduling framework ###### tags: `Assignment`, `Scheduling framework`, `Solution` ## Outline 這是[Assignment - Scheduling framework](/wQc04rR7RfWg2zfw3wSt5A)的解答,僅供參考。 ## Code Structure ``` bash src/github.com # package是以$GOPATH/src作為根目錄 └── NTHU-LSALAB # import的package路徑都是相對於根目錄 └── riya-scheduler ├── cmd # 主程式 │ └── scheduler │ └── main.go # 程式進入點 ├── deploy # 安裝部署 │ ├── deployment.yaml │ ├── rbac.yaml │ └── test # 測試檔案 │ ├── Example1 │ │ └── prefilter_test.yaml │ ├── Example2 │ │ ├── Green.yaml │ │ ├── Red.yaml │ │ └── Yallow.yaml │ ├── Example3 │ │ ├── Hello.yaml │ │ └── Hi.yaml │ └── Example4 │ └── pusheen.yaml ├── Dockerfile # 打包成image ├── go.mod # 版本控制 ├── go.sum # 版本控制 ├── Makefile ├── _output # 可執行檔 │ └── bin │ └── riya-scheduler └── pkg # plugin實作細節 └── plugins ├── register.go # 將實作的plugin註冊回default scheduler └── riya ├── prefilter │ └── prefilter.go ├── queueSort │ └── queueSort.go └── scheduler.go 17 directories, 22 files ``` ## Introduction ### Extention points ![](https://i.imgur.com/FWiREDG.png) 將透過Scheduling framework實作兩個Extention points以符合Assignment的需求,分別為: 1. **QueueSort** + 對Pod的調度隊列進行排序,以決定先調度哪個Pod + 用於比較兩個Pod之間的先後順序 + 同一個時間點只有Enable一個QueueSort 2. **Prefilter** + 對於Pod的訊息做預處理的動作 + 檢查cluster/pod必須滿足的條件 ### Code Explanation 首先,我們對於KubeSchedulerConfiguration資源對象來進行配置。 啟動實現的extension points,並禁用相關的expoint points。 在這裡我們實現了一個叫做`riya`的plugin, 並且啟用了對應的queueSort和preFilter兩個extension points。 同時禁用其他的queueSort。 若有多個plugin欲同時啟用,則啟用的順序: + Default的plugin會先啟動,隨後才是自訂的plugin + 若有多個自訂的plugin,則按照enabled的順序 ```yaml= apiVersion: kubescheduler.config.k8s.io/v1alpha1 kind: KubeSchedulerConfiguration schedulerName: riya-scheduler leaderElection: leaderElect: true lockObjectName: riya-scheduler lockObjectNamespace: kube-system plugins: queueSort: enabled: - name: "riya" disabled: - name: "*" preFilter: enabled: - name: "riya" pluginConfig: - name: "riya" args: {"master": "master", "kubeconfig": "kubeconfig"} ``` 查看kube-scheduler的source code,可以發現plugin是透過`Option`這個type來實現註冊的動作 <font color = gray>**cmd/kube-scheduler/app/server.go**</font> ```go // Option configures a framework.Registry. type Option func(framework.Registry) error // NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions func NewSchedulerCommand(registryOptions ...Option) *cobra.Command { ... } // WithPlugin creates an Option based on plugin name and factory. Please don't remove this function: it is used to register out-of-tree plugins, // hence there are no references to it from the kubernetes scheduler code base. func WithPlugin(name string, factory framework.PluginFactory) Option { return func(registry framework.Registry) error { return registry.Register(name, factory) } } ``` 透過WithPlugin來創建一個`Option`的Instance, 並調用`NewSchedulerCommand`函數將plugin註冊到default scheudler。 **pkg/plugins/register.go** ```go package plugins import ( "github.com/NTHU-LSALAB/riya-scheduler/pkg/plugins/riya" "github.com/spf13/cobra" "k8s.io/kubernetes/cmd/kube-scheduler/app" ) func Register() *cobra.Command { return app.NewSchedulerCommand( app.WithPlugin(riya.Name,riya.New), ) } ``` 而main function會去觸發註冊的動作,使plugin執行。 **cmd/scheduler/main.go** ``` go package main import ( "fmt" "math/rand" "os" "time" "github.com/NTHU-LSALAB/riya-scheduler/pkg/plugins" ) func main() { rand.Seed(time.Now().UTC().UnixNano()) cmd := plugins.Register() if err := cmd.Execute(); err != nil { _, _ = fmt.Fprintf(os.Stderr, "%v\n",err) os.Exit(1) } } ``` 前面所提到的`riya.New`實際上是下面這個函數 <font color = gray>**pkg/scheduler/framework/v1alpha1/registry.go**</font> ```go // PluginFactory is a function that builds a plugin. type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Plugin, error) ``` 接下來Plugin的實現如下 **pkg/plugins/riya/scheduler.go** ```go package riya import ( "context" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/labels" "k8s.io/klog" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" corelisters "k8s.io/client-go/listers/core/v1" // custom "github.com/NTHU-LSALAB/riya-scheduler/pkg/plugins/riya/queueSort" "github.com/NTHU-LSALAB/riya-scheduler/pkg/plugins/riya/prefilter" ) const ( // Name is plugin name Name = "riya" PodGroupName = "podGroup" PodGroupMinAvailable = "minAvailable" ) var ( _ framework.QueueSortPlugin = &Riya{} _ framework.PreFilterPlugin = &Riya{} ) type Args struct { KubeConfig string `json:"kubeconfig,omitempty"` Master string `json:"master,omitempty"` } type Riya struct { args *Args handle framework.FrameworkHandle podLister corelisters.PodLister } func New(configuration *runtime.Unknown, f framework.FrameworkHandle) (framework.Plugin, error) { args := &Args{} if err := framework.DecodeInto(configuration, args); err != nil { return nil, err } klog.V(3).Infof("Get plugin config args: %+v",args) podLister := f.SharedInformerFactory().Core().V1().Pods().Lister() return &Riya{ args: args, handle: f, podLister: podLister, }, nil } func (r *Riya) Name() string { return Name } // queueSort func (r *Riya) Less(podInfo1, podInfo2 *framework.PodInfo) bool { klog.V(3).Info("---QueueSort---") return queueSort.Less(podInfo1, podInfo2) } // preFilter func (r *Riya) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { klog.V(3).Info("---PreFilter---") podGroupName, minAvailable , err := prefilter.GetPodGroupLabels(pod) if err != nil { return framework.NewStatus(framework.Error, err.Error()) } if podGroupName == "" || minAvailable <= 1 { return framework.NewStatus(framework.Success, "") } total := r.calculateTotalPods(podGroupName, pod.Namespace) if total < minAvailable { klog.V(3).Infof("The count of podGroup %v/%v/%v is not up to minAvailable(%d) in PreFilter: %d", pod.Namespace, podGroupName, pod.Name, minAvailable, total) return framework.NewStatus(framework.Unschedulable, "less than minAvailable") } return framework.NewStatus(framework.Success, "") } func (r *Riya) PreFilterExtensions() framework.PreFilterExtensions { return nil } func (r *Riya) calculateTotalPods(podGroupName, namespace string) int { // TODO get the total pods from the scheduler cache and queue selector := labels.Set{PodGroupName: podGroupName}.AsSelector() pods, err := r.podLister.Pods(namespace).List(selector) if err != nil { klog.Error(err) return 0 } return len(pods) } ``` **pkg/plugins/riya/prefilter/prefilter.go** ```go package prefilter import ( "strconv" "k8s.io/klog" v1 "k8s.io/api/core/v1" ) const ( PodGroupName = "podGroup" PodGroupMinAvailable = "minAvailable" ) // GetPodGroupLabels will check the pod if belongs to some podGroup. // If so, it will return the podGroupName、minAvailable of podGroup. // If not, it will return "" as podGroupName. func GetPodGroupLabels(pod *v1.Pod) (string, int, error) { podGroupName, exist := pod.Labels[PodGroupName] if !exist || podGroupName == "" { return "", 0, nil } minAvailable, exist := pod.Labels[PodGroupMinAvailable] if !exist || minAvailable == "" { return "", 0, nil } minNum, err := strconv.Atoi(minAvailable) if err != nil { klog.Errorf("GetPodGroupLabels err in riya-schduling %v/%v : %v", pod.Namespace, pod.Name, err.Error()) return "", 0, nil } return podGroupName, minNum, nil } ``` **pkg/plugins/riya/queueSort/queueSort.go** ```go package queueSort import ( "strconv" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" v1 "k8s.io/api/core/v1" "k8s.io/klog" ) func Less(podInfo1, podInfo2 *framework.PodInfo) bool { pw1 := getPodPriority(podInfo1) pw2 := getPodPriority(podInfo2) klog.V(3).Infof("The group Priority %v/%v is %d", podInfo1.Pod.Namespace, podInfo1.Pod.Name, pw1) klog.V(3).Infof("The group Priority %v/%v is %d", podInfo2.Pod.Namespace, podInfo2.Pod.Name, pw2) klog.V(3).Infof("The group Priority %v/%v is greater than the group Priority %v/%v ? %v", podInfo1.Pod.Namespace, podInfo1.Pod.Name, podInfo2.Pod.Namespace, podInfo2.Pod.Name, (pw1 > pw2) ) klog.V(3).Infof("QOS %v vs %v : %v", podInfo1.Pod.Name, podInfo2.Pod.Name,compareQOS(podInfo1.Pod, podInfo2.Pod)) return (pw1 > pw2) || (pw1 == pw2 && compareQOS(podInfo1.Pod, podInfo2.Pod)) } func getPodPriority(podInfo *framework.PodInfo) int64 { var pod int64 = 0.0 if val, ok := podInfo.Pod.Labels["groupPriority"]; ok { pod,_ = strconv.ParseInt(val, 10, 64) } return pod } func compareQOS(p1, p2 *v1.Pod) bool { pq1, pq2 := qos.GetPodQOS(p1), qos.GetPodQOS(p2) klog.V(3).Infof("The QOS %v/%v is %v", p1.Namespace, p1.Name, pq1) klog.V(3).Infof("The QOS %v/%v is %v", p2.Namespace, p2.Name, pq2) if pq1 == v1.PodQOSGuaranteed { return true } else if pq1 == v1.PodQOSBurstable { return pq2 != v1.PodQOSGuaranteed } else { return pq2 == v1.PodQOSBestEffort } } ``` ## Build 編譯&輸出執行檔到`_output/bin/`目錄底下 ```bash= $ go build -o=_output/bin/riya-scheduler ./cmd/scheduler ``` 透過Dockerfile建立image ```bash= $ sudo docker build --no-cache . -t riyazhu/riya-scheduler ``` ```Dockerfile FROM debian:stretch-slim WORKDIR / COPY _output/bin/riya-scheduler /usr/local/bin CMD ["riya-scheduler"] ``` 將image放至Docker hub ```bash= $ sudo docker push riyazhu/riya-scheduler ``` ### Deployment ```bash= $ kubectl apply -f {filename} ``` 設定RBAC權限 **deploy/rbac.yaml** :::spoiler ```yaml= kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: name: riya-cr rules: - apiGroups: - "" resources: - endpoints - events verbs: - create - get - update - apiGroups: - "" resourceNames: - riya-scheduler resources: - endpoints verbs: - delete - get - patch - update - apiGroups: - "" resources: - nodes verbs: - get - list - watch - apiGroups: - "" resources: - pods verbs: - delete - get - list - watch - update - apiGroups: - "" resources: - bindings - pods/binding verbs: - create - apiGroups: - "" resources: - pods/status verbs: - patch - update - apiGroups: - "" resources: - replicationcontrollers - services verbs: - get - list - watch - apiGroups: - apps - extensions resources: - replicasets verbs: - get - list - watch - apiGroups: - apps resources: - statefulsets verbs: - get - list - watch - apiGroups: - policy resources: - poddisruptionbudgets verbs: - get - list - watch - apiGroups: - "" resources: - persistentvolumeclaims - persistentvolumes verbs: - get - list - watch - apiGroups: - "" resources: - configmaps verbs: - get - list - watch - apiGroups: - "storage.k8s.io" resources: - storageclasses - csinodes verbs: - watch - list - get - apiGroups: - "coordination.k8s.io" resources: - leases verbs: - create - get - list - update - apiGroups: - "events.k8s.io" resources: - events verbs: - create - patch - update --- apiVersion: v1 kind: ServiceAccount metadata: name: riya-sa namespace: kube-system --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: riya-crb namespace: kube-system roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: riya-cr subjects: - kind: ServiceAccount name: riya-sa namespace: kube-system ``` ::: **deploy/deployment.yaml** :::spoiler ```yaml= apiVersion: v1 kind: ConfigMap metadata: name: scheduler-config namespace: kube-system data: scheduler-config.yaml: | apiVersion: kubescheduler.config.k8s.io/v1alpha1 kind: KubeSchedulerConfiguration schedulerName: riya-scheduler leaderElection: leaderElect: true lockObjectName: riya-scheduler lockObjectNamespace: kube-system plugins: queueSort: enabled: - name: "riya" disabled: - name: "*" preFilter: enabled: - name: "riya" # filter: # enabled: # - name: "riya" pluginConfig: - name: "riya" args: {"master": "master", "kubeconfig": "kubeconfig"} --- apiVersion: apps/v1 kind: Deployment metadata: name: riya-scheduler namespace: kube-system labels: component: riya-scheduler spec: replicas: 1 selector: matchLabels: component: riya-scheduler template: metadata: labels: component: riya-scheduler spec: serviceAccount: riya-sa priorityClassName: system-cluster-critical volumes: - name: scheduler-config configMap: name: scheduler-config containers: - name: riya-scheduler image: riyazhu/riya-scheduler:latest imagePullPolicy: Always args: - riya-scheduler - --config=/scheduler/scheduler-config.yaml - --v=3 resources: requests: cpu: "50m" volumeMounts: - name: scheduler-config mountPath: /scheduler ``` ::: ## Reference + https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/ + https://github.com/kubernetes/enhancements/blob/master/keps/sig-scheduling/20180409-scheduling-framework.md + https://www.qikqiak.com/post/custom-kube-scheduler/ + https://github.com/NJUPT-ISL/Yoda-Scheduler