直到 kube-apiserver 启动服务器为止

首先

这篇文章是CyberAgent 24卒内定者 Advent Calendar的第12天文章。

コンテナオーケストレーションツールである Kubernetes は様々なコンポーネントで構成されています。ユーザにとって馴染み深いコンポーネントの一つに client-go や kubectl などとやりとりする kube-apiserver があります。この kube-apiserver はその名の通り API Server でサーバーを起動してハンドラーを登録して Kubernetes のコアコンポーネントの役割を担っています。今回は kube-apiserver が実際にサーバーを起動して動くに至っているのかコードを追っていきたいと思います。

kube-apiserver とは

components-of-kubernetes.png

kube-apiserver简介

 2023-12-10 15.42.38.png

Kube-apiserver的实现

今回は kubernetes v1.28.2 を参考に追っていきたいと思います。

入口

kube-apiserver的入口点位于https://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/apiserver.go中,使用app.NewAPIServerCommand()函数创建并执行kube-apiserver的cobra.Command。

func main() {
	command := app.NewAPIServerCommand()
	code := cli.Run(command)
	os.Exit(code)
}

実行時の挙動

cli.Run(command)が呼ばれると https://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/app/server.go#L78 に記述されたcobra.CommandのRunEの実装が呼び出されます。RunE 内ではフラグやログなどの処理を行いサーバーを起動するRun関数を実行します。このRun関数が kube-apiserver のメインロジックを含む実装となっています。

func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
	// To help debugging, immediately log version
	klog.Infof("Version: %+v", version.Get())

	klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

	config, err := NewConfig(opts)
	if err != nil {
		return err
	}
	completed, err := config.Complete()
	if err != nil {
		return err
	}
	server, err := CreateServerChain(completed)
	if err != nil {
		return err
	}

	prepared, err := server.PrepareRun()
	if err != nil {
		return err
	}

	return prepared.Run(stopCh)
}

この関数のNewConfig()でサーバーの設定情報を格納する構造体を作成し、config.Complete()で足りない設定情報を埋めます。CreateServerChain()では前述した認証認可などのリクエストの前処理を行う ServerChain を作成し、PrepareRun()で Server を実行するのに必要な処理をいくつか実行し、Run()で実際にサーバーを起動します。これらの関数について掘り下げていきます。

新配置

NewConfig は https://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/app/config.go#L69 に実装があり API Server(Control Plane)、API Extension Server、Aggregator Server の設定用の Config 構造体を作成しています。

func NewConfig(opts options.CompletedOptions) (*Config, error) {
	c := &Config{
		Options: opts,
	}

	controlPlane, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(opts)
	if err != nil {
		return nil, err
	}
	c.ControlPlane = controlPlane

	apiExtensions, err := apiserver.CreateAPIExtensionsConfig(*controlPlane.GenericConfig, controlPlane.ExtraConfig.VersionedInformers, pluginInitializer, opts.CompletedOptions, opts.MasterCount,
		serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(controlPlane.ExtraConfig.ProxyTransport, controlPlane.GenericConfig.EgressSelector, controlPlane.GenericConfig.LoopbackClientConfig, controlPlane.GenericConfig.TracerProvider))
	if err != nil {
		return nil, err
	}
	c.ApiExtensions = apiExtensions

	aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, controlPlane.ExtraConfig.PeerProxy, pluginInitializer)
	if err != nil {
		return nil, err
	}
	c.Aggregator = aggregator

	return c, nil
}

配置完成

config.Complete函数的实现位于https://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/app/config.go#L56,它将各种服务器的Config结构体赋值给CompletedConfig,并填充了启动服务器所需的所有配置值。

func (c *Config) Complete() (CompletedConfig, error) {
	return CompletedConfig{&completedConfig{
		Options: c.Options,

		Aggregator:    c.Aggregator.Complete(),
		ControlPlane:  c.ControlPlane.Complete(),
		ApiExtensions: c.ApiExtensions.Complete(),

		ExtraConfig: c.ExtraConfig,
	}}, nil
}

CreateServerChain

CreateServerChain の実装は https://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/app/server.go#L174 に実装があり、API Extensions Server と API Server を作成し Aggregator Server に統合します。これにより API Extension Server や API Server で作成した HTTP ハンドラーなどを制御することを可能としています。

func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
	notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
	apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
	if err != nil {
		return nil, err
	}
	crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))

	kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
	if err != nil {
		return nil, err
	}

	// aggregator comes last in the chain
	aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
	if err != nil {
		// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
		return nil, err
	}

	return aggregatorServer, nil
}

准备好跑

PreparedRun 在 https://github.com/kubernetes/kubernetes/blob/v1.28.2/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go#L400 进行了实现,它实现了启动聚合器服务器所需的组件。关于具体内容,本文不会详述,但如果你想了解更多信息,请参考聚合发现文档(aggregated discovery document)和Kubernetes API健康端点(Kubernetes API health endpoints)。

跑步

Run の実装は https://github.com/kubernetes/kubernetes/blob/v1.28.2/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go#L466 に記載しており、この関数が呼び出している s.runnable.Run の実装は https://github.com/kubernetes/kubernetes/blob/v1.28.2/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go#L496 に記載されています。ここではサーバーの起動に関しての実装が書かれておりここは本記事のメインとなるので詳しく説明していきたいと思います。

启动服务器处理

s.runnable.Runの実装 はとても長く読むのが大変ですがサーバー起動処理は以下の NonBlockingRun 関数にまとまっています。

stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
if err != nil {
    return err
}

NonBlockingRun は non-blocking-scoket を用いた非同期通信を行うサーバーを起動する関数で、このうち以下のような s.SecureServingInfo.Serve 関数にサーバー起動処理がまとまっています。

stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
if err != nil {
    close(internalStopCh)
    return nil, nil, err
}

そして、s.SecureServingInfo.Serve にサーバーに関する構造体や設定などを行う実装があり最終行に当たる RunServer でサーバーを起動します。最後に RunServer で server.Serveが呼び出されることでサーバーが起動します。

func RunServer(
	server *http.Server,
	ln net.Listener,
	shutDownTimeout time.Duration,
	stopCh <-chan struct{},
) (<-chan struct{}, <-chan struct{}, error) {
	if ln == nil {
		return nil, nil, fmt.Errorf("listener must not be nil")
	}

	// Shutdown server gracefully.
	serverShutdownCh, listenerStoppedCh := make(chan struct{}), make(chan struct{})
	go func() {
		defer close(serverShutdownCh)
		<-stopCh
		ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
		server.Shutdown(ctx)
		cancel()
	}()

	go func() {
		defer utilruntime.HandleCrash()
		defer close(listenerStoppedCh)

		var listener net.Listener
		listener = tcpKeepAliveListener{ln}
		if server.TLSConfig != nil {
			listener = tls.NewListener(listener, server.TLSConfig)
		}

		err := server.Serve(listener)

		msg := fmt.Sprintf("Stopped listening on %s", ln.Addr().String())
		select {
		case <-stopCh:
			klog.Info(msg)
		default:
			panic(fmt.Sprintf("%s due to error: %v", msg, err))
		}
	}()

	return serverShutdownCh, listenerStoppedCh, nil
}

まとめ

今回は kube-apiserver のエントリーポイントから実際にサーバーを起動させる処理まで内部実装を追っていきました。サーバー起動処理まで追うことで kube-apiserverの実装の全体像を掴めたのではないかと思います。今後機会があればハンドラーの処理や今回省略した内容も含めてまとめられたらと思います。最後までお付き合い頂きありがとうございました。

引用资料

    • Kubernetes

 

    • Kube API Server

 

    • Kube API Server

 

    • aggregated discovery document

 

    Kubernetes API health endpoints
广告
将在 10 秒后关闭
bannerAds