我尝试创建了一个能够更改 CronJob 和 Argo CronWorkflow 的定时任务调度的 API

在这篇文章中要做的事情。

我希望在Kubernetes中动态更改CronJob和Argo Workflows的CronWorkflow两种资源的调度。您可以使用以下的kubectl命令来应用补丁实现这一点。

kubectl patch cronworkflow \
  cronworkflow-name \
  --namespace hoge \
  --type='json' \
  -p='[
    {
      "op": "replace",
      "path": "/spec/schedule",
      "value": "* * * * *"
    }
  ]'

在本篇文章中,假设非工程师通过管理界面进行更改,我们创建了一个用于进行日程变更的 Web API,并通过 Pod 运行该 API 服务器,然后通过 Ingress 将其公开给外部使用。

环境

Kubernetes使用了kind(在Docker中运行的Kubernetes)。

kind version
image.png

提前准备

安装 Argo CLI

我按照这个页面的步骤进行了安装。
本文将省略步骤。

操作顺序

本文介绍了如何使用Ingress NGINX,在80端口上公开HTTP服务器。

创建Kubernetes集群

请参考此页面准备kind的清单文件。

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
  kubeadmConfigPatches:
  - |
    kind: InitConfiguration
    nodeRegistration:
      kubeletExtraArgs:
        node-labels: "ingress-ready=true"
  extraPortMappings:
  - containerPort: 80
    hostPort: 80

使用kind命令创建一个Kubernetes集群。
我将集群命名为foo-cluster,但实际上可以使用任何名字。

kind create cluster --name foo-cluster --config=foo-cluster.yaml
image.png

安装Ingress-Nginx控制器。

一般情况下,您可以按照本页面的步骤安装Ingress-Nginx控制器。
然而,本文将使用kind,按照本页面的步骤为kind安装Ingress-Nginx控制器。

kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/main/deploy/static/provider/kind/deploy.yaml
image.png

等待控制器启动。

kubectl wait --namespace ingress-nginx \
  --for=condition=ready pod \
  --selector=app.kubernetes.io/component=controller \
  --timeout=90s
image.png

确认启动

kubectl get pods -n ingress-nginx
image.png

安装 Argo Workflows。

请按照此页面的步骤进行安装。
在撰写本文时,Argo Workflows的最新版本是v3.4.11。

kubectl create namespace argo

kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.4.11/install.yaml
image.png

创建一个用于作业的命名空间。

我将作业的namespace名称更改为bar,但任何名称都可以。

kubectl create namespace bar
image.png

定期作业开始

我們將從稍後創建的API中添加功能,允許更改CronJob的計劃,因此我們將啟動一個用於測試的CronJob。我們將直接使用官方網站上的示例清單文件。

kubectl -n bar apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/job/cronjob.yaml
image.png

确认启动

kubectl -n bar get cronjob
image.png

我会记住CronJob的名字是hello。

触发CronWorkflow

我們將從稍後建立的API中,使CronWorkflow的計劃可更改,因此我們將啟動一個用於測試的CronWorkflow。
我們直接使用Argo官方網站上的範例清單文件。

kubectl -n bar apply -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cron-workflow.yaml
image.png

启动确认

argo -n bar cron list
image.png

记住CronWorkflow的名称是hello-world。

网络API服务器(使用Go语言)

我将使用client-go开发一个可以更改调度(schedule)的API。
我参考了这段源代码。

文件列表和源代码。

src directory
  ├── admin_server.go
  ├── go.mod
  └── Dockerfile

管理服务器.go

我們使用了Echo作為網頁框架。
在啟動時,需要將一個命名空間名作為程式引數傳遞。
API包含以下4個。

GET /<>/cronjobs
namespaceに属するCronJobの一覧を取得します。

PUT /<>/cronjobs
CronJobのscheduleをupdateします。

GET /</cronworkflows
namespaceに属するCronWorkflowの一覧を取得します。

PUT /<>/cronworkflows
CronWorkflowのscheduleをupdateします。

package main

import (
	"context"
	argoclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"net/http"
	"os"
	"path/filepath"
)

type httpError struct {
	Error string `json:"error"`
}

var (
	namespace     string
	clientset     *kubernetes.Clientset
	argoClientset *argoclientset.Clientset
)

func getCronjobs(c echo.Context) error {
	// レスポンス初期化
	type cronjob struct {
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}
	type responseType struct {
		Data []cronjob `json:"data"`
	}
	var response responseType
	response.Data = []cronjob{}

	// CronJobリスト取得
	cjobs, err := clientset.BatchV1().CronJobs(namespace).List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
	}

	// 各CronJobをレスポンスに追加
	for _, cjob := range cjobs.Items {
		cj := cronjob{
			Name:     cjob.Name,
			Schedule: cjob.Spec.Schedule,
		}
		response.Data = append(response.Data, cj)
	}

	return c.JSON(http.StatusOK, response)
}

func updateCronjob(c echo.Context) error {
	// リクエストボディ&レスポンスの型
	type cronjob struct {
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}

	// リクエストボディを構造体にバインド
	reqb := cronjob{}
	if err := c.Bind(&reqb); err != nil {
		return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
	}

	cjClient := clientset.BatchV1().CronJobs(namespace)

	// CronJob取得
	cj, err := cjClient.Get(context.TODO(), reqb.Name, metav1.GetOptions{})
	if err != nil {
		return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
	}

	// スケジュール変更
	cj.Spec.Schedule = reqb.Schedule

	// 更新適用
	resultCj, err := cjClient.Update(context.TODO(), cj, metav1.UpdateOptions{})
	if err != nil {
		return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
	}

	// 更新された内容をレスポンスにする
	response := cronjob{
		Name:     resultCj.Name,
		Schedule: resultCj.Spec.Schedule,
	}
	return c.JSON(http.StatusOK, response)
}

func getCronworkflows(c echo.Context) error {
	// レスポンス初期化
	type cronworkflow struct {
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}
	type responseType struct {
		Data []cronworkflow `json:"data"`
	}
	var response responseType
	response.Data = []cronworkflow{}

	// CronWorkflowリスト取得
	cworkflows, err := argoClientset.ArgoprojV1alpha1().CronWorkflows(namespace).List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
	}

	// 各CronWorkflowをレスポンスに追加
	for _, cworkflow := range cworkflows.Items {
		cw := cronworkflow{
			Name:     cworkflow.Name,
			Schedule: cworkflow.Spec.Schedule,
		}
		response.Data = append(response.Data, cw)
	}

	return c.JSON(http.StatusOK, response)
}

func updateCronworkflow(c echo.Context) error {
	// リクエストボディ&レスポンスの型
	type cronworkflow struct {
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}

	// リクエストボディを構造体にバインド
	reqb := cronworkflow{}
	if err := c.Bind(&reqb); err != nil {
		return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
	}

	cwClient := argoClientset.ArgoprojV1alpha1().CronWorkflows(namespace)

	// CronWorkflow取得
	cw, err := cwClient.Get(context.TODO(), reqb.Name, metav1.GetOptions{})
	if err != nil {
		return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
	}

	// スケジュール変更
	cw.Spec.Schedule = reqb.Schedule

	// 更新適用
	resultCw, err := cwClient.Update(context.TODO(), cw, metav1.UpdateOptions{})
	if err != nil {
		return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
	}

	// 更新された内容をレスポンスにする
	response := cronworkflow{
		Name:     resultCw.Name,
		Schedule: resultCw.Spec.Schedule,
	}
	return c.JSON(http.StatusOK, response)
}

// 開発用。本番コードでは不要
func outClusterConfig() (*rest.Config, error) {
	// Create client
	kubeconfig, ok := os.LookupEnv("KUBECONFIG")
	if !ok {
		kubeconfig = filepath.Join(homedir.HomeDir(), ".kube", "config")
	}
	return clientcmd.BuildConfigFromFlags("", kubeconfig)
}

func main() {
	// Program argsでKubernetesのnamespaceを指定
	if len(os.Args) != 2 {
		panic("assert len(os.Args) == 2")
	}
	namespace = os.Args[1]

	// creates the in-cluster config
	config, err := rest.InClusterConfig()
	if err != nil {
		config, err = outClusterConfig() // 開発用。本番コードでは消す
		if err != nil {
			panic(err.Error())
		}
	}

	// creates the clientset and argoClientset
	clientset, err = kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
	argoClientset, err = argoclientset.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	// echoでAPIサーバー
	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())

	// Routing
	api := e.Group("/" + namespace)
	api.GET("/cronjobs", getCronjobs)
	api.PUT("/cronjobs", updateCronjob)
	api.GET("/cronworkflows", getCronworkflows)
	api.PUT("/cronworkflows", updateCronworkflow)

	e.Logger.Fatal(e.Start(":1323"))
}

go.mod文件

module admin_server

go 1.20

require (
	github.com/argoproj/argo-workflows/v3 v3.4.11
	github.com/labstack/echo/v4 v4.11.1
	k8s.io/apimachinery v0.28.2
	k8s.io/client-go v0.28.2
)

require (
	github.com/davecgh/go-spew v1.1.1 // indirect
	github.com/emicklei/go-restful/v3 v3.10.0 // indirect
	github.com/go-logr/logr v1.2.4 // indirect
	github.com/go-openapi/jsonpointer v0.19.6 // indirect
	github.com/go-openapi/jsonreference v0.20.2 // indirect
	github.com/go-openapi/swag v0.22.3 // indirect
	github.com/gogo/protobuf v1.3.2 // indirect
	github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
	github.com/golang/protobuf v1.5.3 // indirect
	github.com/google/gnostic-models v0.6.8 // indirect
	github.com/google/go-cmp v0.5.9 // indirect
	github.com/google/gofuzz v1.2.0 // indirect
	github.com/google/uuid v1.3.0 // indirect
	github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
	github.com/imdario/mergo v0.3.13 // indirect
	github.com/josharian/intern v1.0.0 // indirect
	github.com/json-iterator/go v1.1.12 // indirect
	github.com/labstack/gommon v0.4.0 // indirect
	github.com/mailru/easyjson v0.7.7 // indirect
	github.com/mattn/go-colorable v0.1.13 // indirect
	github.com/mattn/go-isatty v0.0.19 // indirect
	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
	github.com/modern-go/reflect2 v1.0.2 // indirect
	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
	github.com/sirupsen/logrus v1.9.3 // indirect
	github.com/spf13/pflag v1.0.5 // indirect
	github.com/valyala/bytebufferpool v1.0.0 // indirect
	github.com/valyala/fasttemplate v1.2.2 // indirect
	golang.org/x/crypto v0.12.0 // indirect
	golang.org/x/net v0.14.0 // indirect
	golang.org/x/oauth2 v0.11.0 // indirect
	golang.org/x/sys v0.11.0 // indirect
	golang.org/x/term v0.11.0 // indirect
	golang.org/x/text v0.12.0 // indirect
	golang.org/x/time v0.3.0 // indirect
	google.golang.org/appengine v1.6.7 // indirect
	google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
	google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130 // indirect
	google.golang.org/genproto/googleapis/rpc v0.0.0-20230720185612-659f7aaaa771 // indirect
	google.golang.org/grpc v1.56.2 // indirect
	google.golang.org/protobuf v1.31.0 // indirect
	gopkg.in/inf.v0 v0.9.1 // indirect
	gopkg.in/yaml.v2 v2.4.0 // indirect
	gopkg.in/yaml.v3 v3.0.1 // indirect
	k8s.io/api v0.28.2 // indirect
	k8s.io/klog/v2 v2.100.1 // indirect
	k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
	k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
	sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
	sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
	sigs.k8s.io/yaml v1.3.0 // indirect
)

Docker镜像文件

FROM golang:bullseye as builder
WORKDIR /app
COPY ./admin_server.go ./
COPY ./go.mod ./
RUN go mod tidy
RUN go build -o admin_server admin_server.go

FROM debian:bullseye
WORKDIR /app
COPY --from=builder /app/admin_server /app/
ENTRYPOINT ["/app/admin_server"]

Docker构建

docker build ./ -t admin-server

将Docker镜像注册到kind中

kind load docker-image admin-server --name=foo-cluster
image.png

创建用于API服务器的服务帐户。

为了在Pod中运行API服务器,需要创建一个服务账户。
已准备以下清单文件。

※ 服务账户的参考资源(谢谢你)

    • https://qiita.com/knqyf263/items/ecc799650fe247dce9c5

 

    https://hyoublog.com/2022/05/02/kubernetes-serviceaccount/
apiVersion: v1
kind: ServiceAccount
metadata:
  name: admin-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: admin-role
rules:
  - apiGroups:
      - batch
    verbs:
      - list
      - get
      - update
    resources:
      - cronjobs
  - apiGroups:
      - argoproj.io
    verbs:
      - list
      - get
      - update
    resources:
      - cronworkflows
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: admin-role-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: admin-role
subjects:
  - kind: ServiceAccount
    name: admin-sa

创建服务帐号。

kubectl -n bar apply -f admin-sa.yaml
image.png

创建用于API服务器的Service。

准备以下清单文件。

    • 先ほど作成したサービスアカウント「admin-sa」を指定します。

 

    • APIサーバー起動時のプログラム引数として、namespace(本記事では”bar”)を渡すように設定します。

 

    typeはNodePortにし、ポート8888でAPIサーバーに繋がるようにしました。
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: bar-admin-deployment
  namespace: bar
  labels:
    app: bar-admin-server
spec:
  replicas: 1
  selector:
    matchLabels:
      app: bar-admin-server
  template:
    metadata:
      labels:
        app: bar-admin-server
    spec:
      serviceAccountName: admin-sa
      containers:
      - name: bar-admin-container
        image: admin-server:latest
        imagePullPolicy: Never
        args: ["bar"]
---
apiVersion: v1
kind: Service
metadata:
  name: bar-admin-nodeport
  namespace: bar
  labels:
    app: bar-admin-server
spec:
  type: NodePort
  ports:
  - port: 8888
    targetPort: 1323
  selector:
    app: bar-admin-server

我会启动Service。

kubectl apply -f bar-admin-nodeport.yaml
image.png

确认启动:确保以bar-admin-deployment-开头的NAME的pod已经启动了。

kubectl -n bar get pod
image.png

服务的操作确认

暂时在这里进行API服务器的运行确认。
我将提供两种方法。

使用kubectl debug命令的方法

由于该文章的副本被设置为1,因此可以使用以下命令进行调试工作。

kubectl -n bar debug $(kubectl -n bar get --no-headers=true pods -l app=bar-admin-server -o custom-columns=:metadata.name) --image=curlimages/curl -it -- sh
image.png

我尝试调用四个 API。

curl http://bar-admin-nodeport.bar:8888/bar/cronjobs
image.png
curl http://bar-admin-nodeport.bar:8888/bar/cronworkflows
image.png
curl --location --request PUT 'http://bar-admin-nodeport.bar:8888/bar/cronjobs' --header 'Content-Type: application/json' --data '{"name": "hello", "schedule": "1 0 * * *"}'
image.png
curl --location --request PUT 'http://bar-admin-nodeport.bar:8888/bar/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "hello-world", "schedule": "2 0 * * *"}'
image.png

我确认了API能够正常运行。

使用 kubectl port-forward 命令的方式

kubectl -n bar port-forward svc/bar-admin-nodeport 30000:8888
image.png

我将尝试调用四个API。

curl http://localhost:30000/bar/cronjobs
image.png
curl http://localhost:30000/bar/cronworkflows
image.png
curl --location --request PUT 'http://localhost:30000/bar/cronjobs' --header 'Content-Type: application/json' --data '{"name": "hello", "schedule": "3 0 * * *"}'
image.png
curl --location --request PUT 'http://localhost:30000/bar/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "hello-world", "schedule": "4 0 * * *"}'
image.png

我已确认API正常运行。

创造了Ingress

为了将API公开给外部使用,我们使用Ingress。
下面准备了以下的清单文件。

    本番ではhttpsで公開すべきですが、本記事ではhttpにしました。
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: bar-admin-ingress
  namespace: bar
spec:
  ingressClassName: "nginx"
  rules:
  - http:
      paths:
      - path: /bar
        pathType: Prefix
        backend:
          service:
            name: bar-admin-nodeport
            port:
              number: 8888

我要启动Ingress。

kubectl apply -f bar-admin-ingress.yaml
image.png

确认启动

kubectl -n bar get ingress
image.png

请确认Ingress的操作是否正常。

将”your_host”替换为机器的主机名,然后尝试调用4个API。

curl http://your_host/bar/cronjobs
image.png
curl http://your_host/bar/cronworkflows
image.png
curl --location --request PUT 'http://your_host/bar/cronjobs' --header 'Content-Type: application/json' --data '{"name": "hello", "schedule": "5 0 * * *"}'
image.png
curl --location --request PUT 'http://your_host/bar/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "hello-world", "schedule": "6 0 * * *"}'
image.png

我们确认了API能够正常运行。

解决问题的结果

删除最初创建的Kubernetes集群。

kind delete clusters foo-cluster
image.png
广告
将在 10 秒后关闭
bannerAds