# Solution - Operator ###### tags: `Assignment`, `Operator`, `Solution` ## Outline 這是[Assignment - Opertor](https://hackmd.io/Nb-PIgHdS4mgf_yaEgFsVg)的解答,是透過**client-go**實現,僅供參考。 [code](https://github.com/StarCoral/K8S_Study_Group_2020/tree/master/Operator) 將會存放於github 接下來我們會分為兩個部分來做介紹: + Part I : Operator + Part II: the monitor pod program ( 對應於pm-{podname}的image) ## Note 將會用到的package放入$GOPATH/src ```bash= $ cd $GOPATH/src # 下載 $ wget https://lsalab.cs.nthu.edu.tw/~riya/assignment/podMonitorVendor.tar.gz # 解壓縮 $ tar -zxvf ./podMonitorVendor.tar.gz # 將vendor的檔案移出 $ mv ./vendor/* . ``` ## <font color = gray>Part I - Operator </font> Operator 開發主要可以分為兩個階段: 1. Custom Resource Definition 2. Logic implement ### Custom Resource Definition 首先我們先定義一個Custom Resource - **PodMonitor**。 透過CustomResourceDefinition,告訴k8s系統目前有一個新的resource ```bash= $ kubectl apply -f pm-crd.yaml ``` ```yaml apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: # name must match the spec fields below, and be in the form: <plural>.<group> name: podmonitors.lsalab.nthu spec: # group name to use for REST API: /apis/<group>/<version> group: lsalab.nthu # version supported by this CRD version: v1 # either Namespaced or Cluster scope: Namespaced names: # kind is normally the CamelCased singular type. Your resource manifests use this. kind: PodMonitor #upper camel case # plural name to be used in the URL: /apis/<group>/<version>/<plural> plural: podmonitors # singular name to be used as an alias on the CLI and for display singular: podmonitor # shortNames allow shorter string to match your resource on the CLI shortNames: - pm ``` 查看是否成功創立 ```bash= $ kubectl get crd ``` ![](https://i.imgur.com/0pcY4kk.png) 但目前為止,只是告知k8s系統有這樣一種資源存在, 但還不知道這種資源有哪些field,所以我們要透過程式碼將該資源註冊到k8s的API中,告訴etcd這個資源要以什麼樣的形式儲存。 接下來我們會在`$GOPATH/src/github.com/NTHU-LSALAB/podMonitor/`這個專案中,撰寫下列檔案: ```bash= src/github.com └── NTHU-LSALAB └── podMonitor ├── deploy │ ├── pm-crd.yaml │ └── test │ ├── pm-example.yaml │ ├── pod-test1.yaml │ └── pod-test2.yaml ├── hack │ └── k8s │ ├── boilerplate.go.txt │ ├── tools.go │ ├── update-codegen.sh (optional) │ └── verify-codegen.sh └── pkg └── apis └── podmonitor ├── register.go └── v1 ├── doc.go ├── register.go └── types.go ``` + pkg/apis/podmonitor/register.go ```go package podmonitor // GroupName is the group name used in this package const ( GroupName = "lsalab.nthu" ) ``` + pkg/apis/podmonitor/v1/doc.go 請根據自己的版本號作變更。 在這個檔案中我們可以看到在package上方,有一個Global tag, deepcopy-gen告訴我們預設為這個package的每個type創建deep-copy的methods。 ```go // +k8s:deepcopy-gen=package // +groupName=lsalab.nthu //Package v1 is the v1 version of the API package v1 // import "github.com/NTHU-LSALAB/podMonitor/pkg/apis/lsalab.nthu/v1" ``` :::warning 特別注意Code generator對於tag的擺放位置,有特殊的要求。 有兩種tags,分別為local tag、global tag。 一般local tag會放置於type declaration上方, 而global tag就如同這個檔案一般,置於package上方。 ::: + pkg/apis/podmonitor/v1/register.go 將Custom Resource加到Scheme中 ```go package v1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) // SchemeGroupVersion is group version used to register these objects var SchemeGroupVersion = schema.GroupVersion{Group: "lsalab.nthu", Version: "v1"} // Kind takes an unqualified kind and returns back a Group qualified GroupKind func Kind(kind string) schema.GroupKind { return SchemeGroupVersion.WithKind(kind).GroupKind() } // Resource takes an unqualified resource and return a Group qualified GroupResource func Resource(resource string) schema.GroupResource { return SchemeGroupVersion.WithResource(resource).GroupResource() } var( SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) AddToScheme = SchemeBuilder.AddToScheme ) // Adds the list of known types to Scheme. func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &PodMonitor{}, &PodMonitorList{}, ) metav1.AddToGroupVersion(scheme,SchemeGroupVersion) return nil } ``` + pkg/apis/podmonitor/v1/types.go 依據需求,定義PodMonitor有哪些field。 如下圖所示,PodMonitorSpec的一些variable會對應yaml的field 因此我們可以想像到原先的yaml只是一個簡單的文字檔, 透過type的定義,我們會去處理讀進來的yaml,來實現對應的邏輯。 ![](https://i.imgur.com/keAYVUZ.png) ```go package v1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type PodMonitor struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec PodMonitorSpec `json:"spec"` Status PodMonitorStatus `json:"status"` } type PodMonitorPhase string const ( PodMonitorNone PodMonitorPhase = "" PodMonitorCreating PodMonitorPhase = "Creating" PodMonitorPending PodMonitorPhase = "Pending" PodMonitorRunning PodMonitorPhase = "Running" PodMonitorTerminating PodMonitorPhase = "Terminating" PodMonitorCompleted PodMonitorPhase = "Completed" PodMonitorFailed PodMonitorPhase = "Failed" // PodMonitorUnknown PodMonitorPhase = "Unknown" ) type PodMonitorSpec struct { // the default second is 30 sec Speed int32 `json:"speed,omitempty"` // the file that store the information about resource usage LogDir string `json:"logdir,omitempty"` } type PodMonitorStatus struct { Phase PodMonitorPhase `json:"phase"` Reason string `json:"reason,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type PodMonitorList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata"` Items []PodMonitor `json:"items"` } ``` + hack/k8s/boilerplate.go.txt 這個檔是存放License, code generator產生的檔案會自動在產生的檔案最上方加入License ```go /* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ ``` + hack/k8s/update-codegen.sh 透過script方便我們使用code generator,用以產生clientset﹑informers﹑listers。 ```bash= #!/usr/bin/env bash GO111MODULE=off $GOPATH/src/k8s.io/code-generator/generate-groups.sh all \ github.com/NTHU-LSALAB/podMonitor/pkg/generated \ github.com/NTHU-LSALAB/podMonitor/pkg/apis \ podmonitor:v1 \ --go-header-file $GOPATH/src/github.com/NTHU-LSALAB/podMonitor/hack/k8s/boilerplate.go.txt ``` 執行`hack/k8s/update-codegen.sh` ```bash= $ chmod +x hack/k8s/update-codegen.sh $ chmod +x $GOPATH/src/k8s.io/code-generator/generate-groups.sh $ sh ./hack/k8s/update-codegen.sh ``` 執行完後目錄會如下所示。 ```bash= . ├── deploy │ ├── pm-crd.yaml │ └── test │ ├── pm-example.yaml │ ├── pod-test1.yaml │ └── pod-test2.yaml ├── go.mod ├── go.sum ├── hack │ └── k8s │ ├── boilerplate.go.txt │ ├── tools.go │ ├── update-codegen.sh │ └── verify-codegen.sh └── pkg ├── apis │ └── podmonitor │ ├── register.go │ └── v1 │ ├── doc.go │ ├── register.go │ ├── types.go │ └── zz_generated.deepcopy.go └── generated ├── clientset │ └── versioned │ ├── clientset.go │ ├── doc.go │ ├── fake │ │ ├── clientset_generated.go │ │ ├── doc.go │ │ └── register.go │ ├── scheme │ │ ├── doc.go │ │ └── register.go │ └── typed │ └── podmonitor │ └── v1 │ ├── doc.go │ ├── fake │ │ ├── doc.go │ │ ├── fake_podmonitor_client.go │ │ ├── fake_podmonitor.go │ │ └── fake_podmonitorphase.go │ ├── generated_expansion.go │ ├── podmonitor_client.go │ ├── podmonitor.go │ └── podmonitorphase.go ├── informers │ └── externalversions │ ├── factory.go │ ├── generic.go │ ├── internalinterfaces │ │ └── factory_interfaces.go │ └── podmonitor │ ├── interface.go │ └── v1 │ ├── interface.go │ └── podmonitor.go └── listers └── podmonitor └── v1 ├── expansion_generated.go └── podmonitor.go ``` ### Control Logic + main.go 此為程式的入口,透過下方的程式可以看出這裡是採用out-cluster的寫法, api server會驗證傳遞進來的kubeconfig與masterURL兩個參數。 接著會建立clientset與Informer,然後執行實際的logic。 ```go= package main import ( "flag" "time" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" clientset "github.com/NTHU-LSALAB/podMonitor/pkg/generated/clientset/versioned" informers "github.com/NTHU-LSALAB/podMonitor/pkg/generated/informers/externalversions" "github.com/NTHU-LSALAB/podMonitor/pkg/signals" ) var ( masterURL string kubeconfig string ) func main() { klog.InitFlags(nil) flag.Parse() // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) if err != nil { klog.Fatalf("Error building kubeconfig: %s", err.Error()) } // create the actual Kubernetes client set kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { klog.Fatalf("Error building kubernetes clientset: %s", err.Error()) } pmClient, err := clientset.NewForConfig(cfg) if err != nil { klog.Fatalf("Error building podMonitor clientset: %s",err.Error()) } // Informers are a combination of this event interface and an in-memory cache with indexed lookup. // NewSharedInformerFactory caches all objects of a resource in all namespaces in the store kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) pmInformerFactory := informers.NewSharedInformerFactory(pmClient, time.Second*30) controller := NewController(kubeClient, pmClient, kubeInformerFactory.Core().V1().Pods(), pmInformerFactory.Lsalab().V1().PodMonitors()) // notice that there is no need to run Start method in a separate goroutine. // (i.e. go kubeInformerFactory.Start(stopCh)) // Start method is non-blocking and runs all registered informers in a dedicated goroutine. kubeInformerFactory.Start(stopCh) pmInformerFactory.Start(stopCh) if err = controller.Run(2, stopCh); err != nil { klog.Fatalf("Error running controller: %s", err.Error()) } } func init() { flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") } ``` + controller.go ``` go= package main import ( "fmt" "reflect" "strconv" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" // "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" corev1informer "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" corev1lister "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" lsalabv1 "github.com/NTHU-LSALAB/podMonitor/pkg/apis/podmonitor/v1" clientset "github.com/NTHU-LSALAB/podMonitor/pkg/generated/clientset/versioned" pmscheme "github.com/NTHU-LSALAB/podMonitor/pkg/generated/clientset/versioned/scheme" informers "github.com/NTHU-LSALAB/podMonitor/pkg/generated/informers/externalversions/podmonitor/v1" listers "github.com/NTHU-LSALAB/podMonitor/pkg/generated/listers/podmonitor/v1" ) const controllerAgentName = "PodMonitorController" const ( PodMonitorLogPath = "/var/local" ) // Controller is the controller implementation for PodMonitor resources type Controller struct { kubeClientset kubernetes.Interface pmClientset clientset.Interface podLister corev1lister.PodLister podsSynced cache.InformerSynced pmLister listers.PodMonitorLister pmsSynced cache.InformerSynced // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. workqueue workqueue.RateLimitingInterface // recorder is an event recorder for recording Event resources to the // Kubernetes API. recorder record.EventRecorder } // NewController returns a new pm controller func NewController( kubeClientset kubernetes.Interface, pmClientset clientset.Interface, podInformer corev1informer.PodInformer, pmInformer informers.PodMonitorInformer) *Controller { // Create event broadcaster // Add PodMonitorController types to the default // kubernetes Scheme so Events can be logged for // PodMonitorController types. utilruntime.Must(pmscheme.AddToScheme(scheme.Scheme)) klog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) controller := &Controller{ kubeClientset: kubeClientset, pmClientset: pmClientset, podLister: podInformer.Lister(), podsSynced: podInformer.Informer().HasSynced, pmLister: pmInformer.Lister(), pmsSynced: pmInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "PodMonitors"), recorder: recorder, } klog.Info("Setting up event handlers") // Set up an event handler for when PodMonitor resources change pmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueuePodMonitor, UpdateFunc: func(old, new interface{}) { controller.enqueuePodMonitor(new) }, }) podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueuePod, UpdateFunc: func(old, new interface{}) { controller.enqueuePod(new) }, }) return controller } // Run will set up the event handlers for types we are interested in, as well // as syncing informer caches and starting workers. It will block untill stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. func (c *Controller) Run(threadiness int, stopCh <- chan struct{}) error { defer utilruntime.HandleCrash() defer c.workqueue.ShutDown() // Start the informer factories to begin populating the informer caches klog.Info("Starting PodMonitor controller") // Wait for the caches to be synced before starting workers if ok := cache.WaitForCacheSync(stopCh, c.pmsSynced); !ok { return fmt.Errorf("faild to wait for caches to sync") } if ok := cache.WaitForCacheSync(stopCh, c.podsSynced); !ok { return fmt.Errorf("faild to wait for caches to sync") } klog.Info("Starting workers") // Launch two workers to process PodMonitor resources for i := 0 ; i < threadiness ; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } klog.Info("Started workers") <-stopCh klog.Info("Shutting down workers") return nil } // runWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the // workqueue. func (c *Controller) runWorker() { for c.processNextWorkItem() { } } // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. func (c *Controller) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown{ return false } // We wrap this block in a func so we can defer c.workqueue.Done. err := func(obj interface{}) error { // We call Done here so the workqueue knows we have finished // processing this item. We also must remember to call Forget if we // do not want this work item being re-queued. For example, we do // not call Forget if a transient error occurs, instead the item is // put back on the workqueue and attempted again after a back-off // period. defer c.workqueue.Done(obj) var key string var ok bool // We expect strings to come off the workqueue. These are of the // form namespace/name. We do this as the delayed nature of the // workqueue means the items in the informer cache may actually be // more up to date that when the item was initially put onto the // workqueue. if key, ok = obj.(string) ; !ok { // As the item in the workqueue is actually invalid, we call // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. c.workqueue.Forget(obj) utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } // Run the syncHandler, passing it the namespace/name string of the // PodMonitor resource to be synced. if err := c.syncHandler(key); err != nil{ // Put the item back on the workqueue to handle any transient errors. c.workqueue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s,requeuing",key,err.Error()) } // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. c.workqueue.Forget(obj) klog.Infof("Successfully Synced '%s'",key) return nil }(obj) if err != nil { utilruntime.HandleError(err) return true } return true } // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the PodMonitor resource // with the current status of the resource. It returns how long to wait // until the schedule is due. func (c *Controller) syncHandler(key string) (error) { klog.Infof("------------- Reconciling Resource PodMonitor %s -------------", key) // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } // Get the PodMonitor resource with this namespace/name pm, err := c.pmLister.PodMonitors(namespace).Get(name) if err != nil { // The PodMonitor resource may no longer exist, in which case we stop // processing. if errors.IsNotFound(err) { utilruntime.HandleError(fmt.Errorf("pm '%s' in work queue no longer exists", key)) return nil } } pm_pod := pm.DeepCopy() switch pm.Status.Phase { case lsalabv1.PodMonitorNone: if err := c.changePhase(pm, lsalabv1.PodMonitorPending); err != nil { return err } case lsalabv1.PodMonitorPending, lsalabv1.PodMonitorFailed: klog.Infof("PodMonitor %s: phase=PENDING", key) //pod := createPodForCRD(pm) _, err := c.podLister.Pods(namespace).Get(name) if errors.IsNotFound(err) { klog.Info("Can't find the pod to monitor.") return err } if err := c.changePhase(pm, lsalabv1.PodMonitorRunning); err != nil { return err } case lsalabv1.PodMonitorRunning: klog.Infof("PodMonitor %s: phase=RUNNING", key) pod := createPodForCRD(pm) // Set the pm instance as the owner and controller owner := metav1.NewControllerRef(pm_pod, lsalabv1.SchemeGroupVersion.WithKind("PodMonitor")) pod.ObjectMeta.OwnerReferences = append(pod.ObjectMeta.OwnerReferences, *owner) found, err := c.kubeClientset.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) if err != nil && errors.IsNotFound(err) { found, err = c.kubeClientset.CoreV1().Pods(pod.Namespace).Create(pod) if err != nil { return err } klog.Infof("PodMonitor %s: pod launched: name=%s", key, pod.Name) }else if err != nil { return err }else if found.Status.Phase == corev1.PodFailed || found.Status.Phase == corev1.PodSucceeded { klog.Infof("PodMonitor %s: container terminated: reason=%q message=%q", key, found.Status.Reason, found.Status.Message) pm_pod.Status.Phase = lsalabv1.PodMonitorCompleted } else { return nil } case lsalabv1.PodMonitorCompleted: klog.Infof("PodMonitor %s: phase=COMPLETED", key) return nil default: klog.Infof("PodMonitor %s: NOP", key) return nil } if !reflect.DeepEqual(pm, pm_pod) { _, err := c.pmClientset.LsalabV1().PodMonitors(pm_pod.Namespace).UpdateStatus(pm_pod) if err != nil { return err } } return nil } // enqueuePodMonitor takes a PodMonitor resource and converts it into a namespace/name // string which is then put onto the work queue. This method should *not* be // passed resources of any type other than PodMonitor. func (c *Controller) enqueuePodMonitor(obj interface{}) { var key string var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { utilruntime.HandleError(err) return } c.workqueue.Add(key) } func (c *Controller) enqueuePod(obj interface{}) { var pod *corev1.Pod var ok bool if pod, ok = obj.(*corev1.Pod); !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("error decoding pod, invalid type")) return } pod, ok = tombstone.Obj.(*corev1.Pod) if !ok { utilruntime.HandleError(fmt.Errorf("error decoding pod tombstone, invalid type")) return } klog.V(4).Infof("Recovered deleted pod '%s' from tombstone", pod.GetName()) } if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil { if ownerRef.Kind != "PodMonitor" { return } pm, err := c.pmLister.PodMonitors(pod.GetNamespace()).Get(ownerRef.Name) if err != nil { klog.V(4).Infof("ignoring orphaned pod '%s' of PodMonitor '%s'", pod.GetSelfLink(), ownerRef.Name) return } klog.Infof("enqueuing PodMonitor %s/%s because pod changed", pm.Namespace, pm.Name) c.enqueuePodMonitor(pm) } } func (c *Controller) changePhase(pm *lsalabv1.PodMonitor, phase lsalabv1.PodMonitorPhase) error { // Clone because the pm object is owned by the lister pmCopy := pm.DeepCopy() return c.updateStatus(pmCopy, phase, nil) } func (c *Controller) updateStatus(pm *lsalabv1.PodMonitor, phase lsalabv1.PodMonitorPhase, reason error) error { pm.Status.Reason = "" if reason != nil { pm.Status.Reason = reason.Error() } pm.Status.Phase = phase _, err := c.pmClientset.LsalabV1().PodMonitors(pm.Namespace).Update(pm) return err } func createPodForCRD(pm *lsalabv1.PodMonitor) *corev1.Pod { labels := map[string]string { "podmonitors.lsalab.nthu": pm.Name, "podmonitors": "true", } PodMonitorLogDir := pm.Spec.LogDir //var pod *corev1.Pod pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "podmonitor-log", Namespace: pm.Namespace, Labels: labels, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: "podmonitor", Image: "riyazhu/podmonitor:latest", // Name: "busybox", // Image: "busybox", }, }, RestartPolicy: corev1.RestartPolicyOnFailure, ServiceAccountName: "podmonitor-metricstest", }, } //klog.Info("There is pod template ", pod) podTem := pod.Spec.DeepCopy() for i := range podTem.Containers{ c := &podTem.Containers[i] c.Env = append( c.Env, corev1.EnvVar{ Name: "PODMONITOR_NAMESPACE", Value: pm.Namespace, }, corev1.EnvVar{ Name: "PODMONITOR_NAME", Value: pm.Name, }, corev1.EnvVar{ Name: "PODMONITOR_SPEED", Value: strconv.Itoa(int(pm.Spec.Speed)), }, corev1.EnvVar{ Name: "PODMONITOR_LOGDIR", Value: pm.Spec.LogDir, }, ) c.VolumeMounts = append(c.VolumeMounts, corev1.VolumeMount{ Name: "podmonitor-log", MountPath: PodMonitorLogDir, }, ) } podTem.Volumes = append(podTem.Volumes, corev1.Volume{ Name: "podmonitor-log", VolumeSource: corev1.VolumeSource{ HostPath: &corev1.HostPathVolumeSource{ Path: PodMonitorLogPath, }, }, }, ) return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pm-"+pm.Name, Namespace: pm.Namespace, Labels: labels, }, Spec: *podTem, } } ``` ### Build & Deploy + 編譯成可執行檔 可以直接使用這個執行檔測試,也可透過pod的方式運行 ```bash $ go build -o podMonitor . ``` + 編譯成image - Dockerfile ```shell= FROM golang:1.14.7 AS build-image ADD . /go/src/github.com/NTHU-LSALAB/podMonitor WORKDIR /go/src/github.com/NTHU-LSALAB/podMonitor RUN if [ "$(uname -m)" = "aarch64" ]; then \ CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o podMonitor .; \ else \ CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o podMonitor .; \ fi FROM alpine:3.9 COPY --from=build-image /go/src/github.com/NTHU-LSALAB/podMonitor/podMonitor /usr/bin/podMonitor ENTRYPOINT ["podMonitor"] ``` + 設定權限&部署 ```bash= $ kubectl apply -f deploy.yaml ``` ```yaml= apiVersion: v1 kind: ServiceAccount metadata: name: podmonitor-operator namespace: kube-system --- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: name: podmonitor-operator rules: - apiGroups: [""] resources: ["*"] verbs: ["*"] - apiGroups: ["lsalab.nthu"] resources: ["*"] verbs: ["*"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: podmonitor-operator subjects: - kind: ServiceAccount name: podmonitor-operator namespace: kube-system roleRef: kind: ClusterRole name: podmonitor-operator apiGroup: rbac.authorization.k8s.io --- apiVersion: v1 kind: Pod metadata: name: podmonitor-operator namespace: kube-system labels: app: podmonitor-operator spec: serviceAccountName: podmonitor-operator containers: - name: podmonitor-operator image: riyazhu/podmonitor-operator:latest ``` ## <font color = gray>Part II</font> ### 主程式 那這個部分將要另外實現pm-{podname}的程式,透過這個程式蒐集欲觀察的Pod中所有container的CPU與Memory使用情況,並且會將其製作成image。 而這個程式的話,則是使用in-cluster的方式撰寫,因為是透過pod來運行。 ```go package main import ( "fmt" "os" "path/filepath" "time" "strconv" // "k8s.io/client-go/tools/clientcmd" // corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog" metrics "k8s.io/metrics/pkg/client/clientset/versioned" ) var ( podmonitor_namespace string = "default" podmonitor_name string podmonitor_logdir string = "./log" podmonitor_speed int32 = 30 podmonitor_logpath string ) func init() { podmonitor_namespace = os.Getenv("PODMONITOR_NAMESPACE") podmonitor_name = os.Getenv("PODMONITOR_NAME") podmonitor_logdir = os.Getenv("PODMONITOR_LOGDIR") speed_string := os.Getenv("PODMONITOR_SPEED") ps, err := strconv.ParseInt(speed_string, 10, 64) podmonitor_speed = int32(ps) if err != nil { klog.Info("podmonitor_speed change error, to be default 30 sec.") } if podmonitor_name == "" { klog.Fatalf("The Environmental variables are losts") } podmonitor_logpath = filepath.Join(podmonitor_logdir,"/",podmonitor_name) klog.Info("===Setting up Environmental variables ===") klog.Info("Pod is ",podmonitor_namespace," ",podmonitor_name) klog.Info("Log file will store in ",podmonitor_logpath) klog.Info("Sampling speed: ", podmonitor_speed, " sec.") } func main() { // create the file os.Mkdir(podmonitor_logdir, 0755) file, err := os.OpenFile(podmonitor_logpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE,0600) if err != nil { fmt.Println("Can't open the file") } defer file.Close() msg := fmt.Sprintf("============ %s / %s ============ \n", podmonitor_namespace, podmonitor_name) klog.Info(msg) file.WriteString(msg) // klog.Info("============", podmonitor_namespace, " / ", podmonitor_name, "============") // get the config by the variables (KUBERNETES_SERVICE_HOST KUBERNETES_SERVICE_PORT) config, err := rest.InClusterConfig() if err != nil { klog.Fatalf("Can't get cluster config: %s", err.Error()) } mc,err := metrics.NewForConfig(config) if err != nil { panic(err.Error()) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } for { pod, err := clientset.CoreV1().Pods(podmonitor_namespace).Get(podmonitor_name, metav1.GetOptions{}) if err != nil { klog.Info(err) continue } msg := fmt.Sprintf("Pod status: %s\n", pod.Status.Phase) // klog.Info(" \t Pod status: ", pod.Status.Phase) klog.Info(msg) file.WriteString(msg) if pod.Status.Phase =="Succeeded" || pod.Status.Phase =="Failed"{ klog.Info("Done.") file.WriteString("Done.\n") break } podMetrics, err := mc.MetricsV1beta1().PodMetricses(podmonitor_namespace).Get(podmonitor_name, metav1.GetOptions{}) if err != nil { klog.Info(err) } for _, container := range podMetrics.Containers{ cpuQuantity, ok := container.Usage.Cpu().AsInt64() memQuantity, ok := container.Usage.Memory().AsInt64() if !ok{ return } msg := fmt.Sprintf("Container Name: %s \t CPU(cores): %d \t MEMORY(bytes): %d", container.Name, cpuQuantity, memQuantity) klog.Infof(msg) // klog.Infof("Container Name: %s \t CPU usage: %d \t Memory usage: %d", container.Name, cpuQuantity, memQuantity) file.WriteString(msg) } time.Sleep(time.Duration(podmonitor_speed)*time.Second) } } ``` ### 設定權限&部署 + 授權 因為我們需要與api溝通,因此需要賦予對應的權限 ```yaml= apiVersion: v1 kind: ServiceAccount metadata: name: podmonitor-metricstest namespace: default --- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: namespace: default name: podmonitor-metricstest rules: - apiGroups: [""] resources: ["*"] verbs: ["*"] - apiGroups: ["lsalab.nthu"] resources: ["*"] verbs: ["*"] - apiGroups: ["metrics.k8s.io"] resources: ["*"] verbs: ["*"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: podmonitor-metricstest subjects: - kind: ServiceAccount name: podmonitor-metricstest namespace: default roleRef: kind: ClusterRole name: podmonitor-metricstest apiGroup: rbac.authorization.k8s.io ``` + Dockerfile ```shell= FROM debian COPY ./metric-test /metric-test ENTRYPOINT /metric-test ``` ## Refernece http://www.ruanyifeng.com/blog/2017/11/bash-set.html https://itnext.io/how-to-generate-client-codes-for-kubernetes-custom-resource-definitions-crd-b4b9907769ba