通过 kube-proxy 实现了解 iptables ~第一部分。kube-proxy 的基础~

首先

这篇文章是CyberAgent的2023届毕业生内定者Advent Calendar的第六篇文章。

大家都知道的容器平台 Kubernetes 是由多种组件组合而成的,其中一个组件是用于控制工作节点网络的 “kube-proxy”。kube-proxy 有几种工作模式来控制工作节点内的流量,本次我们将追踪并了解其中一种使用 Linux 网络功能的 “iptables” 工作模式的行为。

kube-proxy 是什么?

kube-proxy 是在每个工作节点上运行的进程,根据创建的 Service 资源等来进行工作节点内的网络设置,以实现与 Pod 的通信。

Screenshot 2023-12-03 at 12.06.21.png

具体设置网络的方法有三种,分别是“用户空间代理模式”、“iptables代理模式”和“IPVS代理模式”。本次将追踪使用iptables代理模式的实施。

iptables 是一种用于在 Linux 操作系统上设置和管理防火墙规则的软件。

iptables是Linux的网络功能,可以通过命令在内核层控制数据包过滤和路由等功能。iptables的实现是一个名为Netfilter的Linux模块,并且实际上iptables就是用于操作Netfilter的接口。[3]

简单来说,如果要阻止来自特定IP地址192.0.2.1的通信,可以使用如下命令来实现。

iptables -A INPUT -s 192.0.2.1 -j DROP

另外,如果只允许SSH通信,则可以如下设置。

iptables -A INPUT -p tcp -m tcp --dport 22 -j ACCEPT

此外,iptables 还可以通过规则修改数据包的内容,例如,如果要更改特定数据包的目标地址(目标网络地址转换),可以设置以下规则。

iptables -A INPUT -d 192.0.2.1/32 -p tcp -m tcp -j DNAT --to-destination 192.0.2.2:80

iptables代理模式的kube-proxy根据Service资源的状态,在每个工作节点上执行iptables命令,将流向Service IP地址的流量重定向到与该Service相关联的Pod。

Kube-proxy的实现

实施的概要

这里我们要确认 kube-proxy 的实现。kube-proxy 与其他 Kubernetes 组件一样,是用 Go 编写的,其源代码存放在 https://github.com/kubernetes/kubernetes 的代码库中。我们将重点关注 v1.28.4 的实现。

在进入具体实施之前,让我们先确认kube-proxy的实施概要。将其绘制成图形如下所示:

Screenshot 2023-12-03 at 15.43.00.png

kube-proxy的实体是一个名为ProxyServer的结构体,它拥有一些其他结构体,如Client和Proxier(还有其他结构体省略不写)。

ProxyServer通过Client和kube-apiserver进行持续监视Service和EndpointSlice等资源。当这些资源被创建或修改时,ProxyServer将检测到并执行Proxier的syncProxyRules()函数以同步资源状态和工作节点的iptables规则。另外,Proxier在启动时执行syncLoop()函数,即使资源没有发生变化,也会定期执行syncProxyRules()函数以进行资源状态和iptables规则的同步。

ProxyServer中的Proxier是由proxy.Provider接口定义的,该接口在https://github.com/kubernetes/kubernetes/blob/v1.28.4/pkg/proxy/types.go#L30中定义。它还具有config.ServiceHandler、config.EndpointSliceHandler接口来处理Service/EndpointSlice资源的变更,以及syncProxyRules()的包装器sync()和定期执行sync()的syncLoop()。

// #L30

type Provider interface {
	config.EndpointSliceHandler
	config.ServiceHandler
	config.NodeHandler

	// Sync immediately synchronizes the Provider's current state to proxy rules.
	Sync()
	// SyncLoop runs periodic work.
	// This is expected to run as a goroutine or as the main loop of the app.
	// It does not return.
	SyncLoop()
}

kube-proxy的工作模式不同,Proxier的实现也会有所不同。例如,对于本次讨论的iptables代理模式,https://github.com/kubernetes/kubernetes/blob/v1.28.4/pkg/proxy/iptables/proxier.go 是Proxier的实现。

入口

让我们从入口点开始追踪 kube-proxy 的实现。

kube-proxy的入口点位于https://github.com/kubernetes/kubernetes/blob/v1.28.4/cmd/kube-proxy/proxy.go,并且在这里只是创建了一个cobra.Command,并调用Run()方法。

func main() {
	command := app.NewProxyCommand()
	code := cli.Run(command)
	os.Exit(code)
}

创建并启动ProxyServer

在入口点创建的 cobra.Command 中注册了 ProxyServer 的创建过程,其实现在 https://github.com/kubernetes/kubernetes/blob/v1.28.4/cmd/kube-proxy/app/server.go#L358 中通过 newProxyServer() 创建 ProxyServer,最后通过执行 o.runLoop() 来启动创建的 ProxyServer。

// #L358

// Run runs the specified ProxyServer.
func (o *Options) Run() error {
	defer close(o.errCh)
	if len(o.WriteConfigTo) > 0 {
		return o.writeConfigFile()
	}

	err := platformCleanup(o.config.Mode, o.CleanupAndExit)
	if o.CleanupAndExit {
		return err
	}
	// We ignore err otherwise; the cleanup is best-effort, and the backends will have
	// logged messages if they failed in interesting ways.

	proxyServer, err := newProxyServer(o.config, o.master, o.InitAndExit)
	if err != nil {
		return err
	}
	if o.InitAndExit {
		return nil
	}

	o.proxyServer = proxyServer
	return o.runLoop()
}
// #L378

// runLoop will watch on the update change of the proxy server's configuration file.
// Return an error when updated
func (o *Options) runLoop() error {
	if o.watcher != nil {
		o.watcher.Run()
	}

	// run the proxy in goroutine
	go func() {
		err := o.proxyServer.Run()
		o.errCh <- err
	}()

	for {
		err := <-o.errCh
		if err != nil {
			return err
		}
	}
}

在newProxyServer()函数中,通过kube-apiserver创建Client以监视Service资源等,并通过createClient()和createProxier()创建Proxier结构来进行实际的网络设置。

// #L581

// newProxyServer creates a ProxyServer based on the given config
func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master string, initOnly bool) (*ProxyServer, error) {
	s := &ProxyServer{Config: config}

    // (中略)

	s.Client, err = createClient(config.ClientConnection, master)
	if err != nil {
		return nil, err
	}

	// (中略)

	s.Proxier, err = s.createProxier(config, dualStackSupported, initOnly)
	if err != nil {
		return nil, err
	}

	return s, nil
}

创建并启动Proxier

createProxier() 的实现位于 https://github.com/kubernetes/kubernetes/blob/v1.28.4/cmd/kube-proxy/app/server_others.go#L127 ,根据 kube-proxy 的工作模式和是否启用 dualStack(同时使用 IPv4/6)进行分支判断,选择返回哪个 Proxier 的实现。在单栈的 iptables 代理模式下,实现如下部分。

// #L127

// createProxier creates the proxy.Provider
func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration, dualStack bool) (proxy.Provider, error) {
	var proxier proxy.Provider
	var err error

	// (中略)

	if config.Mode == proxyconfigapi.ProxyModeIPTables {
		klog.InfoS("Using iptables Proxier")

		if dualStack {
			// dualStack の場合の Proxier 作成処理
            // (中略)
		} else {
			// Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support).
			var localDetector proxyutiliptables.LocalTrafficDetector
			localDetector, err = getLocalDetector(s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
			if err != nil {
				return nil, fmt.Errorf("unable to create proxier: %v", err)
			}

			// TODO this has side effects that should only happen when Run() is invoked.
			proxier, err = iptables.NewProxier(
				s.PrimaryIPFamily,
				iptInterface,
				utilsysctl.New(),
				execer,
				config.IPTables.SyncPeriod.Duration,
				config.IPTables.MinSyncPeriod.Duration,
				config.IPTables.MasqueradeAll,
				*config.IPTables.LocalhostNodePorts,
				int(*config.IPTables.MasqueradeBit),
				localDetector,
				s.Hostname,
				s.NodeIPs[s.PrimaryIPFamily],
				s.Recorder,
				s.HealthzServer,
				config.NodePortAddresses,
			)
		}

		if err != nil {
			return nil, fmt.Errorf("unable to create proxier: %v", err)
		}
	} else if config.Mode == proxyconfigapi.ProxyModeIPVS {
		// IPVA プロキシモードの場合の Proxier 作成処理
        // (中略)
	}

	return proxier, nil
}

iptables.NewProxier() 是一个使用iptables的Proxier创建方法,在https://github.com/kubernetes/kubernetes/blob/v1.28.4/pkg/proxy/iptables/proxier.go#L222中有实现。通过将自身的syncProxyRules()传递给async.NewBoundedFrequencyRunner()来返回要分配给Proxier的syncRunner结构体,以便在sync()执行时通过syncRunner执行syncProxyRules()。

// #L222

// NewProxier returns a new Proxier given an iptables Interface instance.
// Because of the iptables logic, it is assumed that there is only a single Proxier active on a machine.
// An error will be returned if iptables fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables up to date in the background and
// will not terminate if a particular iptables call fails.
func NewProxier(ipFamily v1.IPFamily,
	ipt utiliptables.Interface,
	sysctl utilsysctl.Interface,
	exec utilexec.Interface,
	syncPeriod time.Duration,
	minSyncPeriod time.Duration,
	masqueradeAll bool,
	localhostNodePorts bool,
	masqueradeBit int,
	localDetector proxyutiliptables.LocalTrafficDetector,
	hostname string,
	nodeIP net.IP,
	recorder events.EventRecorder,
	healthzServer healthcheck.ProxierHealthUpdater,
	nodePortAddressStrings []string,
) (*Proxier, error) {
	// (中略)

	proxier := &Proxier{
		svcPortMap:               make(proxy.ServicePortMap),
		serviceChanges:           proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
		endpointsMap:             make(proxy.EndpointsMap),
		endpointsChanges:         proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
		needFullSync:             true,
		syncPeriod:               syncPeriod,
		iptables:                 ipt,
		masqueradeAll:            masqueradeAll,
		masqueradeMark:           masqueradeMark,
		exec:                     exec,
		localDetector:            localDetector,
		hostname:                 hostname,
		nodeIP:                   nodeIP,
		recorder:                 recorder,
		serviceHealthServer:      serviceHealthServer,
		healthzServer:            healthzServer,
		precomputedProbabilities: make([]string, 0, 1001),
		iptablesData:             bytes.NewBuffer(nil),
		existingFilterChainsData: bytes.NewBuffer(nil),
		filterChains:             proxyutil.NewLineBuffer(),
		filterRules:              proxyutil.NewLineBuffer(),
		natChains:                proxyutil.NewLineBuffer(),
		natRules:                 proxyutil.NewLineBuffer(),
		localhostNodePorts:       localhostNodePorts,
		nodePortAddresses:        nodePortAddresses,
		networkInterfacer:        proxyutil.RealNetwork{},
	}

    burstSyncs := 2
	klog.V(2).InfoS("Iptables sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
	// We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
	// We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
	// time.Hour is arbitrary.
	proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)

	// (中略)

	return proxier, nil
}

iptables.NewProxier() 返回的 Proxier 结构体实现了 proxy.Provider 接口,同一文件中还实现了各种方法如 ServiceHandler、EndpointSliceHandler 接口,以及 sync()、syncLoop() 方法。同时,在这些方法中调用了 syncProxyRules() 方法来实际操作 iptables。根据资源的状态来具体操作 iptables 的方式可以通过查看 syncProxyRules() 方法的内容来了解。(顺便提一下,这个方法大概有800行…)


// #L759

// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// This assumes proxier.mu is NOT held
func (proxier *Proxier) syncProxyRules() {
	proxier.mu.Lock()
	defer proxier.mu.Unlock()

    // don't sync rules till we've received services and endpoints
	if !proxier.isInitialized() {
		klog.V(2).InfoS("Not syncing iptables until Services and Endpoints have been received from master")
		return
	}
 
    (後略)
}

总结

本文简要介绍了kube-proxy的概述和实现,以及作为确认iptables在iptables代理模式下如何使用的准备工作。下次我们将具体查看kube-proxy中iptables的使用情况。

在你读完最后一句之前,非常感谢。

请参阅引用资料

[1] Kubernetes的组件 | Kubernetes
[2] 虚拟IP和服务代理 | Kubernetes
[3] iptables | ArchWiki
[4] kubernetes/kubernetes | GitHub

广告
将在 10 秒后关闭
bannerAds