# Solution - Scheduling framework
###### tags: `Assignment`, `Scheduling framework`, `Solution`
## Outline
這是[Assignment - Scheduling framework](/wQc04rR7RfWg2zfw3wSt5A)的解答,僅供參考。
## Code Structure
``` bash
src/github.com # package是以$GOPATH/src作為根目錄
└── NTHU-LSALAB # import的package路徑都是相對於根目錄
└── riya-scheduler
├── cmd # 主程式
│ └── scheduler
│ └── main.go # 程式進入點
├── deploy # 安裝部署
│ ├── deployment.yaml
│ ├── rbac.yaml
│ └── test # 測試檔案
│ ├── Example1
│ │ └── prefilter_test.yaml
│ ├── Example2
│ │ ├── Green.yaml
│ │ ├── Red.yaml
│ │ └── Yallow.yaml
│ ├── Example3
│ │ ├── Hello.yaml
│ │ └── Hi.yaml
│ └── Example4
│ └── pusheen.yaml
├── Dockerfile # 打包成image
├── go.mod # 版本控制
├── go.sum # 版本控制
├── Makefile
├── _output # 可執行檔
│ └── bin
│ └── riya-scheduler
└── pkg # plugin實作細節
└── plugins
├── register.go # 將實作的plugin註冊回default scheduler
└── riya
├── prefilter
│ └── prefilter.go
├── queueSort
│ └── queueSort.go
└── scheduler.go
17 directories, 22 files
```
## Introduction
### Extention points

將透過Scheduling framework實作兩個Extention points以符合Assignment的需求,分別為:
1. **QueueSort**
+ 對Pod的調度隊列進行排序,以決定先調度哪個Pod
+ 用於比較兩個Pod之間的先後順序
+ 同一個時間點只有Enable一個QueueSort
2. **Prefilter**
+ 對於Pod的訊息做預處理的動作
+ 檢查cluster/pod必須滿足的條件
### Code Explanation
首先,我們對於KubeSchedulerConfiguration資源對象來進行配置。
啟動實現的extension points,並禁用相關的expoint points。
在這裡我們實現了一個叫做`riya`的plugin,
並且啟用了對應的queueSort和preFilter兩個extension points。
同時禁用其他的queueSort。
若有多個plugin欲同時啟用,則啟用的順序:
+ Default的plugin會先啟動,隨後才是自訂的plugin
+ 若有多個自訂的plugin,則按照enabled的順序
```yaml=
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: riya-scheduler
leaderElection:
leaderElect: true
lockObjectName: riya-scheduler
lockObjectNamespace: kube-system
plugins:
queueSort:
enabled:
- name: "riya"
disabled:
- name: "*"
preFilter:
enabled:
- name: "riya"
pluginConfig:
- name: "riya"
args: {"master": "master", "kubeconfig": "kubeconfig"}
```
查看kube-scheduler的source code,可以發現plugin是透過`Option`這個type來實現註冊的動作
<font color = gray>**cmd/kube-scheduler/app/server.go**</font>
```go
// Option configures a framework.Registry.
type Option func(framework.Registry) error
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
...
}
// WithPlugin creates an Option based on plugin name and factory. Please don't remove this function: it is used to register out-of-tree plugins,
// hence there are no references to it from the kubernetes scheduler code base.
func WithPlugin(name string, factory framework.PluginFactory) Option {
return func(registry framework.Registry) error {
return registry.Register(name, factory)
}
}
```
透過WithPlugin來創建一個`Option`的Instance,
並調用`NewSchedulerCommand`函數將plugin註冊到default scheudler。
**pkg/plugins/register.go**
```go
package plugins
import (
"github.com/NTHU-LSALAB/riya-scheduler/pkg/plugins/riya"
"github.com/spf13/cobra"
"k8s.io/kubernetes/cmd/kube-scheduler/app"
)
func Register() *cobra.Command {
return app.NewSchedulerCommand(
app.WithPlugin(riya.Name,riya.New),
)
}
```
而main function會去觸發註冊的動作,使plugin執行。
**cmd/scheduler/main.go**
``` go
package main
import (
"fmt"
"math/rand"
"os"
"time"
"github.com/NTHU-LSALAB/riya-scheduler/pkg/plugins"
)
func main() {
rand.Seed(time.Now().UTC().UnixNano())
cmd := plugins.Register()
if err := cmd.Execute(); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "%v\n",err)
os.Exit(1)
}
}
```
前面所提到的`riya.New`實際上是下面這個函數
<font color = gray>**pkg/scheduler/framework/v1alpha1/registry.go**</font>
```go
// PluginFactory is a function that builds a plugin.
type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Plugin, error)
```
接下來Plugin的實現如下
**pkg/plugins/riya/scheduler.go**
```go
package riya
import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
corelisters "k8s.io/client-go/listers/core/v1"
// custom
"github.com/NTHU-LSALAB/riya-scheduler/pkg/plugins/riya/queueSort"
"github.com/NTHU-LSALAB/riya-scheduler/pkg/plugins/riya/prefilter"
)
const (
// Name is plugin name
Name = "riya"
PodGroupName = "podGroup"
PodGroupMinAvailable = "minAvailable"
)
var (
_ framework.QueueSortPlugin = &Riya{}
_ framework.PreFilterPlugin = &Riya{}
)
type Args struct {
KubeConfig string `json:"kubeconfig,omitempty"`
Master string `json:"master,omitempty"`
}
type Riya struct {
args *Args
handle framework.FrameworkHandle
podLister corelisters.PodLister
}
func New(configuration *runtime.Unknown, f framework.FrameworkHandle) (framework.Plugin, error) {
args := &Args{}
if err := framework.DecodeInto(configuration, args); err != nil {
return nil, err
}
klog.V(3).Infof("Get plugin config args: %+v",args)
podLister := f.SharedInformerFactory().Core().V1().Pods().Lister()
return &Riya{
args: args,
handle: f,
podLister: podLister,
}, nil
}
func (r *Riya) Name() string {
return Name
}
// queueSort
func (r *Riya) Less(podInfo1, podInfo2 *framework.PodInfo) bool {
klog.V(3).Info("---QueueSort---")
return queueSort.Less(podInfo1, podInfo2)
}
// preFilter
func (r *Riya) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
klog.V(3).Info("---PreFilter---")
podGroupName, minAvailable , err := prefilter.GetPodGroupLabels(pod)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
if podGroupName == "" || minAvailable <= 1 {
return framework.NewStatus(framework.Success, "")
}
total := r.calculateTotalPods(podGroupName, pod.Namespace)
if total < minAvailable {
klog.V(3).Infof("The count of podGroup %v/%v/%v is not up to minAvailable(%d) in PreFilter: %d",
pod.Namespace, podGroupName, pod.Name, minAvailable, total)
return framework.NewStatus(framework.Unschedulable, "less than minAvailable")
}
return framework.NewStatus(framework.Success, "")
}
func (r *Riya) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
func (r *Riya) calculateTotalPods(podGroupName, namespace string) int {
// TODO get the total pods from the scheduler cache and queue
selector := labels.Set{PodGroupName: podGroupName}.AsSelector()
pods, err := r.podLister.Pods(namespace).List(selector)
if err != nil {
klog.Error(err)
return 0
}
return len(pods)
}
```
**pkg/plugins/riya/prefilter/prefilter.go**
```go
package prefilter
import (
"strconv"
"k8s.io/klog"
v1 "k8s.io/api/core/v1"
)
const (
PodGroupName = "podGroup"
PodGroupMinAvailable = "minAvailable"
)
// GetPodGroupLabels will check the pod if belongs to some podGroup.
// If so, it will return the podGroupName、minAvailable of podGroup.
// If not, it will return "" as podGroupName.
func GetPodGroupLabels(pod *v1.Pod) (string, int, error) {
podGroupName, exist := pod.Labels[PodGroupName]
if !exist || podGroupName == "" {
return "", 0, nil
}
minAvailable, exist := pod.Labels[PodGroupMinAvailable]
if !exist || minAvailable == "" {
return "", 0, nil
}
minNum, err := strconv.Atoi(minAvailable)
if err != nil {
klog.Errorf("GetPodGroupLabels err in riya-schduling %v/%v : %v", pod.Namespace, pod.Name, err.Error())
return "", 0, nil
}
return podGroupName, minNum, nil
}
```
**pkg/plugins/riya/queueSort/queueSort.go**
```go
package queueSort
import (
"strconv"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
v1 "k8s.io/api/core/v1"
"k8s.io/klog"
)
func Less(podInfo1, podInfo2 *framework.PodInfo) bool {
pw1 := getPodPriority(podInfo1)
pw2 := getPodPriority(podInfo2)
klog.V(3).Infof("The group Priority %v/%v is %d", podInfo1.Pod.Namespace, podInfo1.Pod.Name, pw1)
klog.V(3).Infof("The group Priority %v/%v is %d", podInfo2.Pod.Namespace, podInfo2.Pod.Name, pw2)
klog.V(3).Infof("The group Priority %v/%v is greater than the group Priority %v/%v ? %v", podInfo1.Pod.Namespace, podInfo1.Pod.Name, podInfo2.Pod.Namespace, podInfo2.Pod.Name, (pw1 > pw2) )
klog.V(3).Infof("QOS %v vs %v : %v", podInfo1.Pod.Name, podInfo2.Pod.Name,compareQOS(podInfo1.Pod, podInfo2.Pod))
return (pw1 > pw2) || (pw1 == pw2 && compareQOS(podInfo1.Pod, podInfo2.Pod))
}
func getPodPriority(podInfo *framework.PodInfo) int64 {
var pod int64 = 0.0
if val, ok := podInfo.Pod.Labels["groupPriority"]; ok {
pod,_ = strconv.ParseInt(val, 10, 64)
}
return pod
}
func compareQOS(p1, p2 *v1.Pod) bool {
pq1, pq2 := qos.GetPodQOS(p1), qos.GetPodQOS(p2)
klog.V(3).Infof("The QOS %v/%v is %v", p1.Namespace, p1.Name, pq1)
klog.V(3).Infof("The QOS %v/%v is %v", p2.Namespace, p2.Name, pq2)
if pq1 == v1.PodQOSGuaranteed {
return true
} else if pq1 == v1.PodQOSBurstable {
return pq2 != v1.PodQOSGuaranteed
} else {
return pq2 == v1.PodQOSBestEffort
}
}
```
## Build
編譯&輸出執行檔到`_output/bin/`目錄底下
```bash=
$ go build -o=_output/bin/riya-scheduler ./cmd/scheduler
```
透過Dockerfile建立image
```bash=
$ sudo docker build --no-cache . -t riyazhu/riya-scheduler
```
```Dockerfile
FROM debian:stretch-slim
WORKDIR /
COPY _output/bin/riya-scheduler /usr/local/bin
CMD ["riya-scheduler"]
```
將image放至Docker hub
```bash=
$ sudo docker push riyazhu/riya-scheduler
```
### Deployment
```bash=
$ kubectl apply -f {filename}
```
設定RBAC權限
**deploy/rbac.yaml**
:::spoiler
```yaml=
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: riya-cr
rules:
- apiGroups:
- ""
resources:
- endpoints
- events
verbs:
- create
- get
- update
- apiGroups:
- ""
resourceNames:
- riya-scheduler
resources:
- endpoints
verbs:
- delete
- get
- patch
- update
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- delete
- get
- list
- watch
- update
- apiGroups:
- ""
resources:
- bindings
- pods/binding
verbs:
- create
- apiGroups:
- ""
resources:
- pods/status
verbs:
- patch
- update
- apiGroups:
- ""
resources:
- replicationcontrollers
- services
verbs:
- get
- list
- watch
- apiGroups:
- apps
- extensions
resources:
- replicasets
verbs:
- get
- list
- watch
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- get
- list
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- persistentvolumeclaims
- persistentvolumes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- configmaps
verbs:
- get
- list
- watch
- apiGroups:
- "storage.k8s.io"
resources:
- storageclasses
- csinodes
verbs:
- watch
- list
- get
- apiGroups:
- "coordination.k8s.io"
resources:
- leases
verbs:
- create
- get
- list
- update
- apiGroups:
- "events.k8s.io"
resources:
- events
verbs:
- create
- patch
- update
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: riya-sa
namespace: kube-system
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: riya-crb
namespace: kube-system
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: riya-cr
subjects:
- kind: ServiceAccount
name: riya-sa
namespace: kube-system
```
:::
**deploy/deployment.yaml**
:::spoiler
```yaml=
apiVersion: v1
kind: ConfigMap
metadata:
name: scheduler-config
namespace: kube-system
data:
scheduler-config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: riya-scheduler
leaderElection:
leaderElect: true
lockObjectName: riya-scheduler
lockObjectNamespace: kube-system
plugins:
queueSort:
enabled:
- name: "riya"
disabled:
- name: "*"
preFilter:
enabled:
- name: "riya"
# filter:
# enabled:
# - name: "riya"
pluginConfig:
- name: "riya"
args: {"master": "master", "kubeconfig": "kubeconfig"}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: riya-scheduler
namespace: kube-system
labels:
component: riya-scheduler
spec:
replicas: 1
selector:
matchLabels:
component: riya-scheduler
template:
metadata:
labels:
component: riya-scheduler
spec:
serviceAccount: riya-sa
priorityClassName: system-cluster-critical
volumes:
- name: scheduler-config
configMap:
name: scheduler-config
containers:
- name: riya-scheduler
image: riyazhu/riya-scheduler:latest
imagePullPolicy: Always
args:
- riya-scheduler
- --config=/scheduler/scheduler-config.yaml
- --v=3
resources:
requests:
cpu: "50m"
volumeMounts:
- name: scheduler-config
mountPath: /scheduler
```
:::
## Reference
+ https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/
+ https://github.com/kubernetes/enhancements/blob/master/keps/sig-scheduling/20180409-scheduling-framework.md
+ https://www.qikqiak.com/post/custom-kube-scheduler/
+ https://github.com/NJUPT-ISL/Yoda-Scheduler