# Cloud Controller Managerの実装 ## はじめに このページはKubernetes Internal #2 で実施する`Cloud Controller Managerの実装解説`用の資料です ### 注意事項 - Kubernetesバージョン: v1.17(https://github.com/kubernetes/kubernetes/tree/release-1.17) - 引用するコードは説明に必要そうなところ以外を削ってたり説明用にコメントを追加していたりします ## 参考資料 - https://kubernetes.io/ja/docs/concepts/architecture/cloud-controller/ - https://kubernetes.io/ja/docs/tasks/administer-cluster/running-cloud-controller/ - https://kubernetes.io/ja/docs/tasks/administer-cluster/developing-cloud-controller-manager/ - https://github.com/kubernetes/cloud-provider/blob/release-1.17/cloud.go - https://github.com/kubernetes/community/blob/1922998843eb61d13eb41d6303e36e5e206a1cee/contributors/design-proposals/cloud-provider/cloud-provider-refactoring.md - https://github.com/spf13/cobra ## Cloud ControllerManagerのおさらい ↓これみて(リンク先に動画のリンクもあるよ) https://speakerdeck.com/bells17/cloud-controller-manager-deep-dive ## Cloud Controller Managerの実装を見る前に知っておくと良いこと - Cloud Controller ManagerはKubernetesのコアコンポーネントの一つ - Kubernetes - Cloud間のやり取りをいい感じにやってくれるひと - 動作環境となるクラウド環境別の実装(Cloud Provider)をCloud Controller Managerに読み込み~設定してCloud Controller Managerを起動することで、対象のクラウド環境用の各種処理を行ってくれる - Cloud Provider側の実装はインターフェイスが用意されているので、そのインターフェイスを満たす実装を書く必要がある - Cloud Providerの実装に関する仕様は存在しないので、把握するためにはCloud Controller Managerの実装やCloud Providerの実装例を見る必要がある - Cloud Controller Managerを利用するにはkubeletを`--cloud-provider=external`フラグ付きで起動する必要がある - GKE/EKS/AKSなどのマネージドなKubernetesの裏側ではこのCloud Controller Managerが動いていて色々処理をしてくれてる ## Cloud Controller Managerが各コントローラーを起動するまでの処理 1. cobraというライブラリにより作成したコマンドを起動 https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/cmd/cloud-controller-manager/controller-manager.go#L37-52 ```go=https://github.com/kubernetes/kubernetes/blob/release-1.17/cmd/cloud-controller-manager/controller-manager.go#L37-L51 func main() { rand.Seed(time.Now().UnixNano()) command := app.NewCloudControllerManagerCommand() // ←コマンドの初期化 // ログの初期化 logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { // ←コマンドの実行 os.Exit(1) } } ``` 2. cobraの補足 cobraを使うと↓のような感じで`cmd.Execute()`を実行することでコマンド側で定義した`cmd.Run()`が内部的に呼び出される→Run()ところに実際に実行されるコマンドが書かれている https://github.com/spf13/cobra#create-rootcmd ```go var rootCmd = &cobra.Command{ Use: "hugo", Short: "Hugo is a very fast static site generator", Long: `A Fast and Flexible Static Site Generator built with love by spf13 and friends in Go. Complete documentation is available at http://hugo.spf13.com`, Run: func(cmd *cobra.Command, args []string) { // Do Stuff Here }, } func Execute() { if err := rootCmd.Execute(); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } } ``` Kubernetes関連のコマンドラインツールはほぼだいたいcobraを使って作られているので、雰囲気で使い方・読み方を覚えておくと他のコンポーネントやアプリケーションのコードを読む際にも応用機器ます 3. コマンドの初期化処理 https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/cmd/cloud-controller-manager/app/controllermanager.go#L57-115 ```go // NewCloudControllerManagerCommand creates a *cobra.Command object with default parameters func NewCloudControllerManagerCommand() *cobra.Command { s, err := options.NewCloudControllerManagerOptions() // CCMのデフォルトオプションを生成(なんで変数名が"s"?→多分下で"c"変数使えないから適当につけたのでは?) if err != nil { klog.Fatalf("unable to initialize command options: %v", err) } cmd := &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { verflag.PrintAndExitIfRequested() // バージョンフラグがあったらバージョン情報を出力して os.Exit() する utilflag.PrintFlags(cmd.Flags()) // 各種フラグをログに出力する // オプション情報からcloudcontrollerconfig.Configを生成 c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List()) if err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } if err := Run(c.Complete(), wait.NeverStop); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }, } // このあたりでやっているのはコマンドラインオプションの設定なのでカット ... return cmd } ``` 4. Run() - 初期化処理 - HTTPサーバーの起動 - leader electionの設定 を経て最終的にstartControllers()を実行しているよ https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/cmd/cloud-controller-manager/app/controllermanager.go#L117-220 ```go // Run runs the ExternalCMServer. This should never exit. func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error { // --cloud-providerによって指定されたCloud Providerを初期化する cloud, err := cloudprovider.InitCloudProvider(c.ComponentConfig.KubeCloudShared.CloudProvider.Name, c.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile) if err != nil { klog.Fatalf("Cloud provider could not be initialized: %v", err) } if cloud == nil { klog.Fatalf("cloud provider is nil") } // このクラスターIDというのがないと警告だされるんだけど、このクラスターIDっていうのは一体なんのか... if !cloud.HasClusterID() { if c.ComponentConfig.KubeCloudShared.AllowUntaggedCloud { klog.Warning("detected a cluster without a ClusterID. A ClusterID will be required in the future. Please tag your cluster to avoid any future issues") } else { klog.Fatalf("no ClusterID found. A ClusterID is required for the cloud provider to function properly. This check can be bypassed by setting the allow-untagged-cloud option") } } // CCMのコンフィグ情報を返すだけのエンドポイント /configz を設定 // 設定したけどこのあとこれ使ってないのでいらないのでは?これ // setup /configz endpoint if cz, err := configz.New(ConfigzName); err == nil { cz.Set(c.ComponentConfig) } else { klog.Errorf("unable to register configz: %v", err) } // この次のHTTPサーバー起動のためのエンドポイント設定用オブジェクトを生成 // このメソッドの最後のleader election用関数としても利用される // Setup any health checks we will want to use. var checks []healthz.HealthChecker var electionChecker *leaderelection.HealthzAdaptor if c.ComponentConfig.Generic.LeaderElection.LeaderElect { electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20) checks = append(checks, electionChecker) } // 以下のエンドポイントを持ったHTTPサーバーを起動する // /healthz // /healthz/leaderElection // /configz // /metrics // Secure/Insecureの違いについてはまだ把握してないのですでに知っている人いれば教えてください // Start the controller manager HTTP server if c.SecureServing != nil { unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) // TODO: handle stoppedCh returned by c.SecureServing.Serve if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { return err } } if c.InsecureServing != nil { unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}} handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn) if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil { return err } } // ここでCloud Controller Managerの各コントローラーを起動する関するrun関数を生成 run := func(ctx context.Context) { if err := startControllers(c, ctx.Done(), cloud, newControllerInitializers()); err != nil { klog.Fatalf("error running controllers: %v", err) } } // leader electionが設定されていなければ、そのままrun関数を実行 if !c.ComponentConfig.Generic.LeaderElection.LeaderElect { run(context.TODO()) panic("unreachable") } // 以降はleader electionでrun関数を実行するための設定 // 最終的にleaderelection.LeaderElectionConfig.Callbacks.OnStartedLeadingに設定されたrun関数をプロセスがリーダーになった際に実行しているっぽい // Identity used to distinguish between multiple cloud controller manager instances id, err := os.Hostname() if err != nil { return err } // add a uniquifier so that two processes on the same host don't accidentally both become active id = id + "_" + string(uuid.NewUUID()) // Lock required for leader election rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock, c.ComponentConfig.Generic.LeaderElection.ResourceNamespace, c.ComponentConfig.Generic.LeaderElection.ResourceName, c.LeaderElectionClient.CoreV1(), c.LeaderElectionClient.CoordinationV1(), resourcelock.ResourceLockConfig{ Identity: id, EventRecorder: c.EventRecorder, }) if err != nil { klog.Fatalf("error creating lock: %v", err) } // Try and become the leader and start cloud controller manager loops leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration, RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, }, WatchDog: electionChecker, Name: "cloud-controller-manager", }) panic("unreachable") } ``` 5. startControllers() このstartControllers()で実際に各種コントローラーの起動を行っている https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/cmd/cloud-controller-manager/app/controllermanager.go#L222-261 ```go // startControllers starts the cloud specific controller loops. func startControllers(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}, cloud cloudprovider.Interface, controllers map[string]initFunc) error { // Cloud Provider側の初期化処理を行っている // Initialize the cloud provider with a reference to the clientBuilder cloud.Initialize(c.ClientBuilder, stopCh) // SetInformersメソッドが実装されている場合 // SharedInformersをCloud Provider側にわたす // Set the informer on the user cloud object if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok { informerUserCloud.SetInformers(c.SharedInformers) } // 渡された各コントローラーを起動 for controllerName, initFn := range controllers { if !genericcontrollermanager.IsControllerEnabled(controllerName, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) { klog.Warningf("%q is disabled", controllerName) continue } klog.V(1).Infof("Starting %q", controllerName) _, started, err := initFn(c, cloud, stopCh) if err != nil { klog.Errorf("Error starting %q", controllerName) return err } if !started { klog.Warningf("Skipping %q", controllerName) continue } klog.Infof("Started %q", controllerName) time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) } // APIサーバーの /healthz にリクエストを送り、正常に起動しているかのチェックを行っている // If apiserver is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and controller manager at the same time. if err := genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second); err != nil { klog.Fatalf("Failed to wait for apiserver being healthy: %v", err) } // shared informerの起動を行っている c.SharedInformers.Start(stopCh) select {} } ``` ## NodeController 1. 起動処理 この中では一見するとNodeControllerをNewしてRun()を実行してるだけ https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/cmd/cloud-controller-manager/app/core.go#L41-58 ```go func startCloudNodeController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { // Start the CloudNodeController nodeController, err := cloudcontrollers.NewCloudNodeController( ctx.SharedInformers.Core().V1().Nodes(), // Nodeリソースのinformerが渡されている // cloud node controller uses existing cluster role from node-controller ctx.ClientBuilder.ClientOrDie("node-controller"), cloud, ctx.ComponentConfig.NodeStatusUpdateFrequency.Duration, ) if err != nil { klog.Warningf("failed to start cloud node controller: %s", err) return nil, false, nil } go nodeController.Run(stopCh) return nil, true, nil } ``` 2. NewCloudNodeController Nodeリソースに対するイベントハンドラーのReconciliation Loopが設定されていることがわかる https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/cloud/node_controller.go#L99-L133 ```go // NewCloudNodeController creates a CloudNodeController object func NewCloudNodeController( nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeStatusUpdateFrequency time.Duration) (*CloudNodeController, error) { // event recorderを生成(kubectl describe pod とかしたときに出てくるeventsのあれに記録するやつだったと思う) eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}) eventBroadcaster.StartLogging(klog.Infof) klog.Infof("Sending events to api server.") eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) // cloud.Instance() で、使用しているCloud Providerがインスタンス情報取得を行うInstanceインスタンス(ややこしい)をサポートしているかの // チェックを行い、サポートされていなければNodeControllerを起動しない if _, ok := cloud.Instances(); !ok { return nil, errors.New("cloud provider does not support instances") } cnc := &CloudNodeController{ nodeInformer: nodeInformer, kubeClient: kubeClient, recorder: recorder, cloud: cloud, nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, } // Nodeリソースのinformerに対してイベントハンドラーを設定 // AddFunc/UpdateFuncに対してイベントハンドラーを設定しているので、Nodeリソースの作成/更新処理に対して処理が走るようになっている // それぞれcnc.AddCloudNode/cnc.UpdateCloudNodeと別のメソッドの呼び出しが行われている // Use shared informer to listen to add/update of nodes. Note that any nodes // that exist before node controller starts will show up in the update method cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { cnc.AddCloudNode(context.TODO(), obj) }, UpdateFunc: func(oldObj, newObj interface{}) { cnc.UpdateCloudNode(context.TODO(), oldObj, newObj) }, }) return cnc, nil } ``` cnc.AddCloudNode/cnc.UpdateCloudNodeの中身についてはちょっと先で説明します 3. nodeController.Run() Run()の中身を見るとwait.Until()を使ってcnc.nodeStatusUpdateFrequencyの設定時間毎にcnc.UpdateNodeStatus()が呼び出されていることがわかる Edge/Level Driven Triggerで説明される、いわゆるLevel Drivenの方のTriggerで実行されるReconciliation Loop(なんじゃないかと思ってる) https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/cloud/node_controller.go#L135-L147 ```go // This controller updates newly registered nodes with information // from the cloud provider. This call is blocking so should be called // via a goroutine func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() // The following loops run communicate with the APIServer with a worst case complexity // of O(num_nodes) per cycle. These functions are justified here because these events fire // very infrequently. DO NOT MODIFY this to perform frequent operations. // Start a loop to periodically update the node addresses obtained from the cloud wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh) } ``` 4. 軽くまとめ 以上のところまででNodeControllerには以下の2種類のReconciliation Loopが存在する - Nodeリソースのイベントに対して実行されるイベントハンドラーによって実行されるAddCloudNode/UpdateCloudNode - wait.Until()で実行されるUpdateNodeStatus 5. かんたんなので、先にUpdateNodeStatusから https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/cloud/node_controller.go#L149-L173 ```go // UpdateNodeStatus updates the node status, such as node addresses func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) { // Cloud Providerから各種インスタンス情報を取得するためのInstancesを取得 instances, ok := cnc.cloud.Instances() if !ok { utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider")) return } // Nodeの一覧を取得 // ResourceVersion: "0"の場合は、APIServerのキャッシュにあるやつだけを取得するらしい nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"}) if err != nil { klog.Errorf("Error monitoring node status: %v", err) return } // 各Nodeに対してupdateNodeAddress()を実行 for i := range nodes.Items { cnc.updateNodeAddress(ctx, &nodes.Items[i], instances) } // 各Nodeに対してreconcileNodeLabels()を実行 for _, node := range nodes.Items { err = cnc.reconcileNodeLabels(node.Name) if err != nil { klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err) } } } ``` 6. updateNodeAddress パット見長いけど、要はNodeリソースのNode.Status.Addressesをクラウド側から持ってきた値で更新してるだけ(メソッド名まんま) https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/cloud/node_controller.go#L225-288 ```go // UpdateNodeAddress updates the nodeAddress of a single node func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1.Node, instances cloudprovider.Instances) { // "node.cloudprovider.kubernetes.io/uninitialized" というtaintをノードから取得 // このtaintはkubeletで--cloud-provider=externalで起動すると付与される // もしこのtaintがある場合には処理をしない=初期化処理が完了してからのみ処理を行う // Do not process nodes that are still tainted cloudTaint := getCloudTaint(node.Spec.Taints) if cloudTaint != nil { klog.V(5).Infof("This node %s is still tainted. Will not process.", node.Name) return } // node.Spec.ProviderIDで設定されたID、もしくはでinstances.InstanceID(node.Name)で検索されたインスタンスが存在するかチェック // 存在しなければ処理しない // node.Name(つまりノード名)はクラウド側でインスタンスを一意に特定できる名前である必要があることがわかる // Node that isn't present according to the cloud provider shouldn't have its address updated exists, err := ensureNodeExistsByProviderID(ctx, instances, node) if err != nil { // Continue to update node address when not sure the node is not exists klog.Errorf("%v", err) } else if !exists { klog.V(4).Infof("The node %s is no longer present according to the cloud provider, do not process.", node.Name) return } // node.Spec.ProviderIDまたはnode.Nameをもとにノードの持つアドレス情報を取得する(プライベートIPなど) nodeAddresses, err := getNodeAddressesByProviderIDOrName(ctx, instances, node) if err != nil { klog.Errorf("Error getting node addresses for node %q: %v", node.Name, err) return } if len(nodeAddresses) == 0 { klog.V(5).Infof("Skipping node address update for node %q since cloud provider did not return any", node.Name) return } // nodeAddressesにNodeHostNameという種類のがあるのかをチェックしてる // 種類には以下がある // NodeHostName NodeAddressType = "Hostname" // NodeExternalIP NodeAddressType = "ExternalIP" // NodeInternalIP NodeAddressType = "InternalIP" // NodeExternalDNS NodeAddressType = "ExternalDNS" // NodeInternalDNS NodeAddressType = "InternalDNS" // Check if a hostname address exists in the cloud provided addresses hostnameExists := false for i := range nodeAddresses { if nodeAddresses[i].Type == v1.NodeHostName { hostnameExists = true break } } // ↑のチェックでNodeHostNameがなかった場合はnode.Status.Addressesから探してきて、あればnodeAddressesに追加する // node.Status.Addressesはkubelet側が設定するんじゃなかったかと思ってる(自信ない) // If hostname was not present in cloud provided addresses, use the hostname // from the existing node (populated by kubelet) if !hostnameExists { for _, addr := range node.Status.Addresses { if addr.Type == v1.NodeHostName { nodeAddresses = append(nodeAddresses, addr) } } } // "alpha.kubernetes.io/provided-node-ip"というannotationが設定されているかをチェックして // あれば、設定された値と同じ値がnodeAddressesの中にあるかチェックを行い // nodeAddressesの中にない場合にはエラーログを出して処理を中断する // "alpha.kubernetes.io/provided-node-ip"はkubeletで--node-ipフラグから設定されるもののよう // If nodeIP was suggested by user, ensure that // it can be found in the cloud as well (consistent with the behaviour in kubelet) if nodeIP, ok := ensureNodeProvidedIPExists(node, nodeAddresses); ok { if nodeIP == nil { klog.Errorf("Specified Node IP not found in cloudprovider for node %q", node.Name) return } } // node.Status.AddressesとnodeAddressesを比較して、差分がなければそこで終了する if !nodeAddressesChangeDetected(node.Status.Addresses, nodeAddresses) { return } // nodeAddressesでNode.Status.Addressesを更新 newNode := node.DeepCopy() newNode.Status.Addresses = nodeAddresses _, _, err = nodeutil.PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode) if err != nil { klog.Errorf("Error patching node with cloud ip addresses = [%v]", err) } } ``` 7. reconcileNodeLabels 要は新しいラベル名がNodeになければ古いラベル名から値をコピーして追加するだけ 多分Kubernetesバージョンアップ対応処理なんじゃないかな https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/cloud/node_controller.go#L175-223 ```go // reconcileNodeLabels reconciles node labels transitioning from beta to GA func (cnc *CloudNodeController) reconcileNodeLabels(nodeName string) error { // 引数ではnodeNameを受け取ってるくせしてなんでnodeInformerわざわざから取得してるの? node, err := cnc.nodeInformer.Lister().Get(nodeName) if err != nil { // If node not found, just ignore it. if apierrors.IsNotFound(err) { return nil } return err } if node.Labels == nil { // Nothing to reconcile. return nil } // 要は新しいラベル名のラベルが設定されてなかったら、古いラベル名の値を新しい方にもコピーしてくる処理 // "failure-domain.beta.kubernetes.io/zone" → "topology.kubernetes.io/zone" // "failure-domain.beta.kubernetes.io/region" → "topology.kubernetes.io/region" // "beta.kubernetes.io/instance-type" → "node.kubernetes.io/instance-type" labelsToUpdate := map[string]string{} for _, r := range labelReconcileInfo { primaryValue, primaryExists := node.Labels[r.primaryKey] secondaryValue, secondaryExists := node.Labels[r.secondaryKey] if !primaryExists { // The primary label key does not exist. This should not happen // within our supported version skew range, when no external // components/factors modifying the node object. Ignore this case. continue } if secondaryExists && primaryValue != secondaryValue { // Secondary label exists, but not consistent with the primary // label. Need to reconcile. labelsToUpdate[r.secondaryKey] = primaryValue } else if !secondaryExists && r.ensureSecondaryExists { // Apply secondary label based on primary label. labelsToUpdate[r.secondaryKey] = primaryValue } } if len(labelsToUpdate) == 0 { return nil } // labelsToUpdateのラベルを既存のnodeのラベルに上書きして更新(地味に長いので内部処理の説明は省略) if !cloudnodeutil.AddOrUpdateLabelsOnNode(cnc.kubeClient, labelsToUpdate, node) { return fmt.Errorf("failed update labels for node %+v", node) } return nil } ``` これでLevel Driven TriggerのReconciliation Loop処理の説明終わり 8. AddCloudNode/UpdateCloudNode 次にEdge Driven Triggerの方 cloudTaintがない=初期化処理済なら処理をしない 初期化処理が行われていなければinitializeNodeを実行 ```go // AddCloudNode handles initializing new nodes registered with the cloud taint. func (cnc *CloudNodeController) AddCloudNode(ctx context.Context, obj interface{}) { node := obj.(*v1.Node) cloudTaint := getCloudTaint(node.Spec.Taints) if cloudTaint == nil { klog.V(2).Infof("This node %s is registered without the cloud taint. Will not process.", node.Name) return } cnc.initializeNode(ctx, node) } func (cnc *CloudNodeController) UpdateCloudNode(ctx context.Context, _, newObj interface{}) { node, ok := newObj.(*v1.Node) if !ok { utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) return } cloudTaint := getCloudTaint(node.Spec.Taints) if cloudTaint == nil { // The node has already been initialized so nothing to do. return } cnc.initializeNode(ctx, node) } ``` 9. initializeNode https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/cloud/node_controller.go#L319-431 ```go // This processes nodes that were added into the cluster, and cloud initialize them if appropriate func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Node) { instances, ok := cnc.cloud.Instances() if !ok { utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider")) return } // clientretry.RetryOnConflictは第2引数で渡した関数を実行して、それが競合状態によるエラーであれば // 第1引数(UpdateNodeSpecBackoff)で設定された回数だけリトライを行う関数のよう // 要は競合によるエラーなら一定回数処理を繰り返すための関数 err := clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error { // TODO(wlan0): Move this logic to the route controller using the node taint instead of condition // Since there are node taints, do we still need this? // This condition marks the node as unusable until routes are initialized in the cloud provider if cnc.cloud.ProviderName() == "gce" { if err := cloudnodeutil.SetNodeCondition(cnc.kubeClient, types.NodeName(node.Name), v1.NodeCondition{ Type: v1.NodeNetworkUnavailable, Status: v1.ConditionTrue, Reason: "NoRouteCreated", Message: "Node created without a route", LastTransitionTime: metav1.Now(), }); err != nil { return err } } // API Serverから改めて最新?のNodeを取得 curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) if err != nil { return err } // 初期化済なら処理しない cloudTaint := getCloudTaint(curNode.Spec.Taints) if cloudTaint == nil { // Node object received from event had the cloud taint but was outdated, // the node has actually already been initialized. return nil } // Node.Spec.ProviderIDがセットされていなければ // if curNode.Spec.ProviderID == "" { // 内部でCloud Providerのinstances.InstanceID(ctx, nodeName)によるインスタンス情報の取得が行われて // cloud.ProviderName() + "://" + instanceID // のフォーマットのproviderIDが返される providerID, err := cloudprovider.GetInstanceProviderID(ctx, cnc.cloud, types.NodeName(curNode.Name)) if err == nil { curNode.Spec.ProviderID = providerID } else if err == cloudprovider.NotImplemented { // instances.InstanceID()を実装しているかどうかはCloud Provider次第なので、もし実装されていないというエラーが返ってくる場合はWarningログ出力 // if the cloud provider being used does not support provider IDs, // we can safely continue since we will attempt to set node // addresses given the node name in getNodeAddressesByProviderIDOrName klog.Warningf("cloud provider does not set node provider ID, using node name to discover node %s", node.Name) } else { // if the cloud provider being used supports provider IDs, we want // to propagate the error so that we re-try in the future; if we // do not, the taint will be removed, and this will not be retried return err } } nodeAddresses, err := getNodeAddressesByProviderIDOrName(ctx, instances, curNode) if err != nil { return err } // If user provided an IP address, ensure that IP address is found // in the cloud provider before removing the taint on the node if nodeIP, ok := ensureNodeProvidedIPExists(curNode, nodeAddresses); ok { if nodeIP == nil { return errors.New("failed to find kubelet node IP from cloud provider") } } // 内部でinstances.InstanceTypeByProviderID(ctx, node.Spec.ProviderID)または // instances.InstanceType(ctx, types.NodeName(node.Name))によってインスタンスタイプの取得が行われる(awsの例だとt2.microとかそういうやつ) // 取得したらそれをラベルに設定する // // ここではエラー処理にcloudprovider.NotImplementedかのチェックをしていないことから // instances.InstanceType()もしくは内部でinstances.InstanceTypeByProviderID()は // なんらかの形でインスタンスタイプを返すように実装する必要があることがわかる if instanceType, err := getInstanceTypeByProviderIDOrName(ctx, instances, curNode); err != nil { return err } else if instanceType != "" { klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType) curNode.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceTypeStable, instanceType) curNode.ObjectMeta.Labels[v1.LabelInstanceTypeStable] = instanceType } // Cloud Provider側でZoneが実装されている場合 // getZoneByProviderIDOrName()の内部でzones.GetZoneByProviderID(ctx, node.Spec.ProviderID)または // zones.GetZoneByNodeName(ctx, types.NodeName(node.Name))によってゾーン情報が取得される // 取得できたらそれらをラベルに設定する // ちなみにゾーンオブジェクトは // type Zone struct { // FailureDomain string // Region string // } // のようになっており、FailureDomainはゾーンが入るので実際にはゾーン+リージョンとなっている // // ここではエラー処理にcloudprovider.NotImplementedかのチェックをしていないことからZonesを実装している場合には // zones.GetZoneByProviderID()もしくは内部でzones.GetZoneByNodeName()は // なんらかの形でゾーン情報を返すように実装する必要があることがわかる if zones, ok := cnc.cloud.Zones(); ok { zone, err := getZoneByProviderIDOrName(ctx, zones, curNode) if err != nil { return fmt.Errorf("failed to get zone from cloud provider: %v", err) } if zone.FailureDomain != "" { klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain) curNode.ObjectMeta.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomainStable, zone.FailureDomain) curNode.ObjectMeta.Labels[v1.LabelZoneFailureDomainStable] = zone.FailureDomain } if zone.Region != "" { klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region) curNode.ObjectMeta.Labels[v1.LabelZoneRegion] = zone.Region klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegionStable, zone.Region) curNode.ObjectMeta.Labels[v1.LabelZoneRegionStable] = zone.Region } } // taintからCloudTaintが除去される=この変更が保存されればそれで初期化処理が完了となる curNode.Spec.Taints = excludeCloudTaint(curNode.Spec.Taints) _, err = cnc.kubeClient.CoreV1().Nodes().Update(curNode) if err != nil { return err } // After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses // So that users do not see any significant delay in IP addresses being filled into the node cnc.updateNodeAddress(ctx, curNode, instances) klog.Infof("Successfully initialized node %s with cloud provider", node.Name) return nil }) if err != nil { utilruntime.HandleError(err) return } } ``` ## NodeLifecycleController 1. startCloudNodeLifecycleController https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/cmd/cloud-controller-manager/app/core.go#L60-77 ```go func startCloudNodeLifecycleController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { // Start the cloudNodeLifecycleController cloudNodeLifecycleController, err := cloudcontrollers.NewCloudNodeLifecycleController( ctx.SharedInformers.Core().V1().Nodes(), // cloud node lifecycle controller uses existing cluster role from node-controller ctx.ClientBuilder.ClientOrDie("node-controller"), cloud, ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration, ) if err != nil { klog.Warningf("failed to start cloud node lifecycle controller: %s", err) return nil, false, nil } go cloudNodeLifecycleController.Run(stopCh) return nil, true, nil } ``` 2. NewCloudNodeLifecycleController NodeControllerとは違い特にReconcileの設定などは行っていない https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/cloud/node_lifecycle_controller.go#L67-101 ```go func NewCloudNodeLifecycleController( nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeMonitorPeriod time.Duration) (*CloudNodeLifecycleController, error) { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"}) eventBroadcaster.StartLogging(klog.Infof) klog.Info("Sending events to api server") eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) if kubeClient == nil { return nil, errors.New("kubernetes client is nil") } if cloud == nil { return nil, errors.New("no cloud provider provided") } if _, ok := cloud.Instances(); !ok { // Instancesが利用できることのチェックなどをしている return nil, errors.New("cloud provider does not support instances") } c := &CloudNodeLifecycleController{ kubeClient: kubeClient, nodeLister: nodeInformer.Lister(), recorder: recorder, cloud: cloud, nodeMonitorPeriod: nodeMonitorPeriod, } return c, nil } ``` 3. *CloudNodeLifecycleController.Run() `wait.Until()` を使ってReconcile Loopを実行している https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/cloud/node_lifecycle_controller.go#L103-115 ```go // Run starts the main loop for this controller. Run is blocking so should // be called via a goroutine func (c *CloudNodeLifecycleController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() // The following loops run communicate with the APIServer with a worst case complexity // of O(num_nodes) per cycle. These functions are justified here because these events fire // very infrequently. DO NOT MODIFY this to perform frequent operations. // Start a loop to periodically check if any nodes have been // deleted or shutdown from the cloudprovider wait.Until(c.MonitorNodes, c.nodeMonitorPeriod, stopCh) } ``` 4. *CloudNodeLifecycleController.MonitorNodes() https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/cloud/node_lifecycle_controller.go#L117-197 ```go // MonitorNodes checks to see if nodes in the cluster have been deleted // or shutdown. If deleted, it deletes the node resource. If shutdown it // applies a shutdown taint to the node func (c *CloudNodeLifecycleController) MonitorNodes() { instances, ok := c.cloud.Instances() if !ok { utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider")) return } nodes, err := c.nodeLister.List(labels.Everything()) // Node一覧をここで取得 if err != nil { klog.Errorf("error listing nodes from cache: %s", err) return } // Node一覧に対してループ処理 for _, node := range nodes { // conditionステータス種別が"Ready"の値をstatus変数に入れてる // Default NodeReady status to v1.ConditionUnknown status := v1.ConditionUnknown if _, c := cloudnodeutil.GetNodeCondition(&node.Status, v1.NodeReady); c != nil { status = c.Status } // "Ready"ステータスが"True"の時 if status == v1.ConditionTrue { // "node.cloudprovider.kubernetes.io/shutdown: NoSchedule" のtaintがあった場合にはそのtaintを削除する // if taint exist remove taint err = controller.RemoveTaintOffNode(c.kubeClient, node.Name, node, ShutdownTaint) if err != nil { klog.Errorf("error patching node taints: %v", err) } // "Ready"ステータスが"True"の時はここに入るので以降の処理は行わない continue } // we need to check this first to get taint working in similar in all cloudproviders // current problem is that shutdown nodes are not working in similar way ie. all cloudproviders // does not delete node from kubernetes cluster when instance it is shutdown see issue #46442 shutdown, err := shutdownInCloudProvider(context.TODO(), c.cloud, node) if err != nil { klog.Errorf("error checking if node %s is shutdown: %v", node.Name, err) } // クラウド側でノードがシャットダウンされている場合には // "node.cloudprovider.kubernetes.io/shutdown: NoSchedule" のtaintを付与する if shutdown && err == nil { // if node is shutdown add shutdown taint err = controller.AddOrUpdateTaintOnNode(c.kubeClient, node.Name, ShutdownTaint) if err != nil { klog.Errorf("failed to apply shutdown taint to node %s, it may have been deleted.", node.Name) } // シャットダウン状態の場合にはこれ以降処理を行わない // Continue checking the remaining nodes since the current one is shutdown. continue } // 以降の処理で、もしクラウド側でノードが削除済なら、Nodeオブジェクトの削除を行う // At this point the node has NotReady status, we need to check if the node has been removed // from the cloud provider. If node cannot be found in cloudprovider, then delete the node exists, err := ensureNodeExistsByProviderID(context.TODO(), instances, node) if err != nil { klog.Errorf("error checking if node %s exists: %v", node.Name, err) continue } if exists { // Continue checking the remaining nodes since the current one is fine. continue } klog.V(2).Infof("deleting node since it is no longer present in cloud provider: %s", node.Name) ref := &v1.ObjectReference{ Kind: "Node", Name: node.Name, UID: types.UID(node.UID), Namespace: "", } c.recorder.Eventf(ref, v1.EventTypeNormal, fmt.Sprintf("Deleting node %v because it does not exist in the cloud provider", node.Name), "Node %s event: %s", node.Name, deleteNodeEvent) if err := c.kubeClient.CoreV1().Nodes().Delete(node.Name, nil); err != nil { klog.Errorf("unable to delete node %q: %v", node.Name, err) } } } ``` ## ServiceController 1. startServiceController ServiceControllerの起動処理 https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/cmd/cloud-controller-manager/app/core.go#L79-97 ```go func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { // ここでServiceリソースに対するイベントハンドラーを設定してる // Start the service controller serviceController, err := servicecontroller.New( cloud, ctx.ClientBuilder.ClientOrDie("service-controller"), ctx.SharedInformers.Core().V1().Services(), ctx.SharedInformers.Core().V1().Nodes(), ctx.ComponentConfig.KubeCloudShared.ClusterName, ) if err != nil { // This error shouldn't fail. It lives like this as a legacy. klog.Errorf("Failed to start service controller: %v", err) return nil, false, nil } go serviceController.Run(stopCh, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs)) return nil, true, nil } ``` 2. *service.Controller.Run() https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/service/controller.go#L193-221 ```go // Run starts a background goroutine that watches for changes to services that // have (or had) LoadBalancers=true and ensures that they have // load balancers created and deleted appropriately. // serviceSyncPeriod controls how often we check the cluster's services to // ensure that the correct load balancers exist. // nodeSyncPeriod controls how often we check the cluster's nodes to determine // if load balancers need to be updated to point to a new set. // // It's an error to call Run() more than once for a given ServiceController // object. func (s *Controller) Run(stopCh <-chan struct{}, workers int) { defer runtime.HandleCrash() defer s.queue.ShutDown() klog.Info("Starting service controller") defer klog.Info("Shutting down service controller") // キャッシュの同期を待ってる if !cache.WaitForNamedCacheSync("service", stopCh, s.serviceListerSynced, s.nodeListerSynced) { return } // workerを実行 for i := 0; i < workers; i++ { go wait.Until(s.worker, time.Second, stopCh) } // nodeSyncLoopを実行 go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh) <-stopCh } ``` ### worker こっちがメインでSVCのイベントに合わせてLBの作成・更新・削除を行うのがメインタスク 1. worker()~processNextWorkItem() 他と同じで `syncService()` という調整処理を実行しているだけ https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/service/controller.go#L223-246 ```go // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (s *Controller) worker() { for s.processNextWorkItem() { } } func (s *Controller) processNextWorkItem() bool { key, quit := s.queue.Get() if quit { return false } defer s.queue.Done(key) err := s.syncService(key.(string)) if err == nil { s.queue.Forget(key) return true } runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err)) s.queue.AddRateLimited(key) return true } ``` 2. syncService() サービスの状態に応じたメソッドの呼び分けをしてる https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/service/controller.go#L731-758 ```go // syncService will sync the Service with the given key if it has had its expectations fulfilled, // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be // invoked concurrently with the same key. func (s *Controller) syncService(key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime)) }() // keyからnamespace/nameを取得 namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } // 実際のServiceリソースを取得 // service holds the latest service info from apiserver service, err := s.serviceLister.Services(namespace).Get(name) switch { case errors.IsNotFound(err): // 対象リソースが見つからない場合=対象のSVCリソースは削除済として、キー名を元に実際のLBの削除を実行 // service absence in store means watcher caught the deletion, ensure LB info is cleaned err = s.processServiceDeletion(key) case err != nil: // "NotFound"以外の場合はエラー処理 runtime.HandleError(fmt.Errorf("Unable to retrieve service %v from store: %v", key, err)) default: // リソースが見つかった場合にはLBの作成/更新処理 err = s.processServiceCreateOrUpdate(service, key) } return err } ``` 3. processServiceDeletion()~processLoadBalancerDelete() 最終的にprocessLoadBalancerDeleteの内部でCloud Providerのロードバランサーの `EnsureLoadBalancerDeleted()` を呼び出すことでロードバランサーの削除処理を行っている https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/service/controller.go#L760-789 ```go func (s *Controller) processServiceDeletion(key string) error { cachedService, ok := s.cache.get(key) if !ok { // Cache does not contains the key means: // - We didn't create a Load Balancer for the deleted service at all. // - We already deleted the Load Balancer that was created for the service. // In both cases we have nothing left to do. return nil } klog.V(2).Infof("Service %v has been deleted. Attempting to cleanup load balancer resources", key) if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil { return err } s.cache.delete(key) return nil } func (s *Controller) processLoadBalancerDelete(service *v1.Service, key string) error { // delete load balancer info only if the service type is LoadBalancer if !wantsLoadBalancer(service) { return nil } s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil { s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err) return err } s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") return nil } ``` 4. processServiceCreateOrUpdate() finalizerが設定されている場合の削除イベントとして呼び出された場合にはLBの削除処理を `processLoadBalancerDelete()` で行う そうでなければ `syncLoadBalancerIfNeeded()` を呼び出す https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/service/controller.go#L262-290 ```go // processServiceCreateOrUpdate operates loadbalancers for the incoming service accordingly. // Returns an error if processing the service update failed. func (s *Controller) processServiceCreateOrUpdate(service *v1.Service, key string) error { // LBのキャッシュが取得できない場合には削除処理を行う? // TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion // path. Ref https://github.com/kubernetes/enhancements/issues/980. cachedService := s.cache.getOrCreate(key) if cachedService.state != nil && cachedService.state.UID != service.UID { // This happens only when a service is deleted and re-created // in a short period, which is only possible when it doesn't // contain finalizer. if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil { return err } } // ここでLBの作成などを行う `syncLoadBalancerIfNeeded()` を実行している // Always cache the service, we need the info for service deletion in case // when load balancer cleanup is not handled via finalizer. cachedService.state = service op, err := s.syncLoadBalancerIfNeeded(service, key) if err != nil { s.eventRecorder.Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", "Error syncing load balancer: %v", err) return err } if op == deleteLoadBalancer { // Only delete the cache upon successful load balancer deletion. s.cache.delete(key) } return nil } ``` 5. syncLoadBalancerIfNeeded()~ensureLoadBalancer() - 削除が必要な場合にはLBを削除 - それ以外の場合は作成・更新 - 最後にSVCのリソースを更新 ```go // syncLoadBalancerIfNeeded ensures that service's status is synced up with loadbalancer // i.e. creates loadbalancer for service if requested and deletes loadbalancer if the service // doesn't want a loadbalancer no more. Returns whatever error occurred. func (s *Controller) syncLoadBalancerIfNeeded(service *v1.Service, key string) (loadBalancerOperation, error) { // Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create, // which may involve service interruption. Also, we would like user-friendly events. // Save the state so we can avoid a write if it doesn't change previousStatus := service.Status.LoadBalancer.DeepCopy() var newStatus *v1.LoadBalancerStatus var op loadBalancerOperation var err error if !wantsLoadBalancer(service) || needsCleanup(service) { // finalizerを利用した削除処理 // Delete the load balancer if service no longer wants one, or if service needs cleanup. op = deleteLoadBalancer newStatus = &v1.LoadBalancerStatus{} _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service) if err != nil { return op, fmt.Errorf("failed to check if load balancer exists before cleanup: %v", err) } if exists { klog.V(2).Infof("Deleting existing load balancer for service %s", key) s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil { return op, fmt.Errorf("failed to delete load balancer: %v", err) } } // Always remove finalizer when load balancer is deleted, this ensures Services // can be deleted after all corresponding load balancer resources are deleted. if err := s.removeFinalizer(service); err != nil { return op, fmt.Errorf("failed to remove load balancer cleanup finalizer: %v", err) } s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") } else { // ensureLoadBalancer()を呼び出してLBの作成・更新を行う // Create or update the load balancer if service wants one. op = ensureLoadBalancer klog.V(2).Infof("Ensuring load balancer for service %s", key) s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer") // ここでfinalizerを付与している // Always add a finalizer prior to creating load balancers, this ensures Services // can't be deleted until all corresponding load balancer resources are also deleted. if err := s.addFinalizer(service); err != nil { return op, fmt.Errorf("failed to add load balancer cleanup finalizer: %v", err) } // LBの削除処理を実行 newStatus, err = s.ensureLoadBalancer(service) if err != nil { if err == cloudprovider.ImplementedElsewhere { // ImplementedElsewhere indicates that the ensureLoadBalancer is a nop and the // functionality is implemented by a different controller. In this case, we // return immediately without doing anything. klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error", key, s.cloud.ProviderName()) return op, nil } return op, fmt.Errorf("failed to ensure load balancer: %v", err) } s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer") } // ステータスを更新 if err := s.patchStatus(service, previousStatus, newStatus); err != nil { // Only retry error that isn't not found: // - Not found error mostly happens when service disappears right after // we remove the finalizer. // - We can't patch status on non-exist service anyway. if !errors.IsNotFound(err) { return op, fmt.Errorf("failed to update load balancer status: %v", err) } } return op, nil } func (s *Controller) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) { // LBからリクエストを流す対象となるノード一覧を取得 nodes, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) if err != nil { return nil, err } // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. if len(nodes) == 0 { s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") } // ノード一覧とSVC情報をCloud Providerのロードバランサーの `EnsureLoadBalancer()` に渡してLBの作成・更新処理を実行 // - Only one protocol supported per service // - Not all cloud providers support all protocols and the next step is expected to return // an error for unsupported protocols return s.balancer.EnsureLoadBalancer(context.TODO(), s.clusterName, service, nodes) } ``` ### nodeSyncLoop こっちはノード情報の更新を定期的にLBに反映させるために実行されるものだと思われる 1. nodeSyncLoop() https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/service/controller.go#L641-668 ```go // nodeSyncLoop handles updating the hosts pointed to by all load // balancers whenever the set of nodes in the cluster changes. func (s *Controller) nodeSyncLoop() { // LBからリクエストを流す対象となるノード一覧を取得 newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) if err != nil { runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) return } // ホスト情報が前回のループの時と変わってなかったら、更新のあったSVCのLBだけホスト情報の更新を行う if nodeSlicesEqualForLB(newHosts, s.knownHosts) { // The set of nodes in the cluster hasn't changed, but we can retry // updating any services that we failed to update last time around. s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) return } klog.V(2).Infof("Detected change in list of current cluster nodes. New node set: %v", nodeNames(newHosts)) // ホスト情報に更新があった場合は全てのSVCのLBに対して更新を行う // Try updating all services, and save the ones that fail to try again next // round. s.servicesToUpdate = s.cache.allServices() numServices := len(s.servicesToUpdate) s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", numServices-len(s.servicesToUpdate), numServices) s.knownHosts = newHosts } ``` ## RouteController 1. startRouteController() https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/cmd/cloud-controller-manager/app/core.go#L99-143 ```go func startRouteController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes { klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes) return nil, false, nil } // If CIDRs should be allocated for pods and set on the CloudProvider, then start the route controller routes, ok := cloud.Routes() if !ok { klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") return nil, false, nil } // ClusterCIDRの設定値から値をパースしたり、dualStackなのかの判定をしているみたい // ClusterCIDR: https://kubernetes.io/docs/reference/command-line-tools-reference/kube-proxy/ // failure: bad cidrs in config clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR) if err != nil { return nil, false, err } // failure: more than one cidr and dual stack is not enabled if len(clusterCIDRs) > 1 && !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.IPv6DualStack) { return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and dualstack feature is not enabled", len(clusterCIDRs)) } // failure: more than one cidr but they are not configured as dual stack if len(clusterCIDRs) > 1 && !dualStack { return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and they are not configured as dual stack (at least one from each IPFamily", len(clusterCIDRs)) } // failure: more than cidrs is not allowed even with dual stack if len(clusterCIDRs) > 2 { return nil, false, fmt.Errorf("length of clusterCIDRs is:%v more than max allowed of 2", len(clusterCIDRs)) } routeController := routecontroller.New( routes, ctx.ClientBuilder.ClientOrDie("route-controller"), ctx.SharedInformers.Core().V1().Nodes(), ctx.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDRs, ) go routeController.Run(stopCh, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration) return nil, true, nil } ``` 2. routeController.Run()~reconcileNodeRoutes() 最終的に調整処理である `reconcile()` を実行している https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/route/route_controller.go#L97-123 ```go func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration) { defer utilruntime.HandleCrash() klog.Info("Starting route controller") defer klog.Info("Shutting down route controller") if !cache.WaitForNamedCacheSync("route", stopCh, rc.nodeListerSynced) { return } if rc.broadcaster != nil { rc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rc.kubeClient.CoreV1().Events("")}) } // TODO: If we do just the full Resync every 5 minutes (default value) // that means that we may wait up to 5 minutes before even starting // creating a route for it. This is bad. // We should have a watch on node and if we observe a new node (with CIDR?) // trigger reconciliation for that node. go wait.NonSlidingUntil(func() { if err := rc.reconcileNodeRoutes(); err != nil { klog.Errorf("Couldn't reconcile node routes: %v", err) } }, syncPeriod, stopCh) <-stopCh } func (rc *RouteController) reconcileNodeRoutes() error { // Cloud Providerのルーターの `ListRoutes()` を使ってクラウド側のVPCの経路情報などの取得を行う routeList, err := rc.routes.ListRoutes(context.TODO(), rc.clusterName) if err != nil { return fmt.Errorf("error listing routes: %v", err) } // 全ノード情報を取得 nodes, err := rc.nodeLister.List(labels.Everything()) if err != nil { return fmt.Errorf("error listing nodes: %v", err) } return rc.reconcile(nodes, routeList) } ``` 3. reconcile() https://sourcegraph.com/github.com/kubernetes/kubernetes@release-1.17/-/blob/pkg/controller/route/route_controller.go#L137-289 ```go func (rc *RouteController) reconcile(nodes []*v1.Node, routes []*cloudprovider.Route) error { var l sync.Mutex // for each node a map of podCIDRs and their created status nodeRoutesStatuses := make(map[types.NodeName]map[string]bool) // routeMap maps routeTargetNode->route routeMap := make(map[types.NodeName][]*cloudprovider.Route) for _, route := range routes { if route.TargetNode != "" { routeMap[route.TargetNode] = append(routeMap[route.TargetNode], route) } } wg := sync.WaitGroup{} rateLimiter := make(chan struct{}, maxConcurrentRouteCreations) // searches existing routes by node for a matching route for _, node := range nodes { // Skip if the node hasn't been assigned a CIDR yet. if len(node.Spec.PodCIDRs) == 0 { continue } nodeName := types.NodeName(node.Name) l.Lock() nodeRoutesStatuses[nodeName] = make(map[string]bool) l.Unlock() // for every node, for every cidr for _, podCIDR := range node.Spec.PodCIDRs { // we add it to our nodeCIDRs map here because add and delete go routines run at the same time l.Lock() nodeRoutesStatuses[nodeName][podCIDR] = false l.Unlock() // ignore if already created if hasRoute(routeMap, nodeName, podCIDR) { l.Lock() nodeRoutesStatuses[nodeName][podCIDR] = true // a route for this podCIDR is already created l.Unlock() continue } // if we are here, then a route needs to be created for this node route := &cloudprovider.Route{ TargetNode: nodeName, DestinationCIDR: podCIDR, } // cloud providers that: // - depend on nameHint // - trying to support dual stack // will have to carefully generate new route names that allow node->(multi cidr) nameHint := string(node.UID) wg.Add(1) go func(nodeName types.NodeName, nameHint string, route *cloudprovider.Route) { defer wg.Done() // RetryOnConflictを使うと処理に失敗した際に何度かリトライ処理をしてくれる err := clientretry.RetryOnConflict(updateNetworkConditionBackoff, func() error { startTime := time.Now() // Ensure that we don't have more than maxConcurrentRouteCreations // CreateRoute calls in flight. rateLimiter <- struct{}{} klog.Infof("Creating route for node %s %s with hint %s, throttled %v", nodeName, route.DestinationCIDR, nameHint, time.Since(startTime)) // Cloud Provider側の `CreateRoute()` を呼び出し経路情報の設定を行う err := rc.routes.CreateRoute(context.TODO(), rc.clusterName, nameHint, route) <-rateLimiter if err != nil { msg := fmt.Sprintf("Could not create route %s %s for node %s after %v: %v", nameHint, route.DestinationCIDR, nodeName, time.Since(startTime), err) if rc.recorder != nil { rc.recorder.Eventf( &v1.ObjectReference{ Kind: "Node", Name: string(nodeName), UID: types.UID(nodeName), Namespace: "", }, v1.EventTypeWarning, "FailedToCreateRoute", msg) klog.V(4).Infof(msg) return err } } l.Lock() nodeRoutesStatuses[nodeName][route.DestinationCIDR] = true l.Unlock() klog.Infof("Created route for node %s %s with hint %s after %v", nodeName, route.DestinationCIDR, nameHint, time.Since(startTime)) return nil }) if err != nil { klog.Errorf("Could not create route %s %s for node %s: %v", nameHint, route.DestinationCIDR, nodeName, err) } }(nodeName, nameHint, route) } } // searches our bag of node->cidrs for a match nodeHasCidr := func(nodeName types.NodeName, cidr string) bool { l.Lock() defer l.Unlock() nodeRoutes := nodeRoutesStatuses[nodeName] if nodeRoutes == nil { return false } _, exist := nodeRoutes[cidr] return exist } // delete routes that are not in use for _, route := range routes { if rc.isResponsibleForRoute(route) { // Check if this route is a blackhole, or applies to a node we know about & has an incorrect CIDR. if route.Blackhole || !nodeHasCidr(route.TargetNode, route.DestinationCIDR) { wg.Add(1) // Delete the route. go func(route *cloudprovider.Route, startTime time.Time) { defer wg.Done() // respect the rate limiter rateLimiter <- struct{}{} klog.Infof("Deleting route %s %s", route.Name, route.DestinationCIDR) // 使われていない経路情報の削除を行う if err := rc.routes.DeleteRoute(context.TODO(), rc.clusterName, route); err != nil { klog.Errorf("Could not delete route %s %s after %v: %v", route.Name, route.DestinationCIDR, time.Since(startTime), err) } else { klog.Infof("Deleted route %s %s after %v", route.Name, route.DestinationCIDR, time.Since(startTime)) } <-rateLimiter }(route, time.Now()) } } } wg.Wait() // 最終的に経路設定に失敗しているノードがあれば対象ノードのネットワークコンディションを失敗状態にしてる // after all routes have been created (or not), we start updating // all nodes' statuses with the outcome for _, node := range nodes { wg.Add(1) nodeRoutes := nodeRoutesStatuses[types.NodeName(node.Name)] allRoutesCreated := true if len(nodeRoutes) == 0 { go func(n *v1.Node) { defer wg.Done() klog.Infof("node %v has no routes assigned to it. NodeNetworkUnavailable will be set to true", n.Name) rc.updateNetworkingCondition(n, false) }(node) continue } // check if all routes were created. if so, then it should be ready for _, created := range nodeRoutes { if !created { allRoutesCreated = false break } } go func(n *v1.Node) { defer wg.Done() rc.updateNetworkingCondition(n, allRoutesCreated) }(node) } wg.Wait() return nil } ```