我尝试创建了一个能够更改 CronJob 和 Argo CronWorkflow 的定时任务调度的 API
在这篇文章中要做的事情。
我希望在Kubernetes中动态更改CronJob和Argo Workflows的CronWorkflow两种资源的调度。您可以使用以下的kubectl命令来应用补丁实现这一点。
kubectl patch cronworkflow \
cronworkflow-name \
--namespace hoge \
--type='json' \
-p='[
{
"op": "replace",
"path": "/spec/schedule",
"value": "* * * * *"
}
]'
在本篇文章中,假设非工程师通过管理界面进行更改,我们创建了一个用于进行日程变更的 Web API,并通过 Pod 运行该 API 服务器,然后通过 Ingress 将其公开给外部使用。
环境
Kubernetes使用了kind(在Docker中运行的Kubernetes)。
kind version
提前准备
安装 Argo CLI
我按照这个页面的步骤进行了安装。
本文将省略步骤。
操作顺序
本文介绍了如何使用Ingress NGINX,在80端口上公开HTTP服务器。
创建Kubernetes集群
请参考此页面准备kind的清单文件。
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 80
hostPort: 80
使用kind命令创建一个Kubernetes集群。
我将集群命名为foo-cluster,但实际上可以使用任何名字。
kind create cluster --name foo-cluster --config=foo-cluster.yaml
安装Ingress-Nginx控制器。
一般情况下,您可以按照本页面的步骤安装Ingress-Nginx控制器。
然而,本文将使用kind,按照本页面的步骤为kind安装Ingress-Nginx控制器。
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/main/deploy/static/provider/kind/deploy.yaml
等待控制器启动。
kubectl wait --namespace ingress-nginx \
--for=condition=ready pod \
--selector=app.kubernetes.io/component=controller \
--timeout=90s
确认启动
kubectl get pods -n ingress-nginx
安装 Argo Workflows。
请按照此页面的步骤进行安装。
在撰写本文时,Argo Workflows的最新版本是v3.4.11。
kubectl create namespace argo
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.4.11/install.yaml
创建一个用于作业的命名空间。
我将作业的namespace名称更改为bar,但任何名称都可以。
kubectl create namespace bar
定期作业开始
我們將從稍後創建的API中添加功能,允許更改CronJob的計劃,因此我們將啟動一個用於測試的CronJob。我們將直接使用官方網站上的示例清單文件。
kubectl -n bar apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/job/cronjob.yaml
确认启动
kubectl -n bar get cronjob
我会记住CronJob的名字是hello。
触发CronWorkflow
我們將從稍後建立的API中,使CronWorkflow的計劃可更改,因此我們將啟動一個用於測試的CronWorkflow。
我們直接使用Argo官方網站上的範例清單文件。
kubectl -n bar apply -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cron-workflow.yaml
启动确认
argo -n bar cron list
记住CronWorkflow的名称是hello-world。
网络API服务器(使用Go语言)
我将使用client-go开发一个可以更改调度(schedule)的API。
我参考了这段源代码。
文件列表和源代码。
src directory
├── admin_server.go
├── go.mod
└── Dockerfile
管理服务器.go
我們使用了Echo作為網頁框架。
在啟動時,需要將一個命名空間名作為程式引數傳遞。
API包含以下4個。
GET /<>/cronjobs
namespaceに属するCronJobの一覧を取得します。
PUT /<>/cronjobs
CronJobのscheduleをupdateします。
GET /</cronworkflows
namespaceに属するCronWorkflowの一覧を取得します。
PUT /<>/cronworkflows
CronWorkflowのscheduleをupdateします。
package main
import (
"context"
argoclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"net/http"
"os"
"path/filepath"
)
type httpError struct {
Error string `json:"error"`
}
var (
namespace string
clientset *kubernetes.Clientset
argoClientset *argoclientset.Clientset
)
func getCronjobs(c echo.Context) error {
// レスポンス初期化
type cronjob struct {
Name string `json:"name"`
Schedule string `json:"schedule"`
}
type responseType struct {
Data []cronjob `json:"data"`
}
var response responseType
response.Data = []cronjob{}
// CronJobリスト取得
cjobs, err := clientset.BatchV1().CronJobs(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
}
// 各CronJobをレスポンスに追加
for _, cjob := range cjobs.Items {
cj := cronjob{
Name: cjob.Name,
Schedule: cjob.Spec.Schedule,
}
response.Data = append(response.Data, cj)
}
return c.JSON(http.StatusOK, response)
}
func updateCronjob(c echo.Context) error {
// リクエストボディ&レスポンスの型
type cronjob struct {
Name string `json:"name"`
Schedule string `json:"schedule"`
}
// リクエストボディを構造体にバインド
reqb := cronjob{}
if err := c.Bind(&reqb); err != nil {
return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
}
cjClient := clientset.BatchV1().CronJobs(namespace)
// CronJob取得
cj, err := cjClient.Get(context.TODO(), reqb.Name, metav1.GetOptions{})
if err != nil {
return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
}
// スケジュール変更
cj.Spec.Schedule = reqb.Schedule
// 更新適用
resultCj, err := cjClient.Update(context.TODO(), cj, metav1.UpdateOptions{})
if err != nil {
return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
}
// 更新された内容をレスポンスにする
response := cronjob{
Name: resultCj.Name,
Schedule: resultCj.Spec.Schedule,
}
return c.JSON(http.StatusOK, response)
}
func getCronworkflows(c echo.Context) error {
// レスポンス初期化
type cronworkflow struct {
Name string `json:"name"`
Schedule string `json:"schedule"`
}
type responseType struct {
Data []cronworkflow `json:"data"`
}
var response responseType
response.Data = []cronworkflow{}
// CronWorkflowリスト取得
cworkflows, err := argoClientset.ArgoprojV1alpha1().CronWorkflows(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
}
// 各CronWorkflowをレスポンスに追加
for _, cworkflow := range cworkflows.Items {
cw := cronworkflow{
Name: cworkflow.Name,
Schedule: cworkflow.Spec.Schedule,
}
response.Data = append(response.Data, cw)
}
return c.JSON(http.StatusOK, response)
}
func updateCronworkflow(c echo.Context) error {
// リクエストボディ&レスポンスの型
type cronworkflow struct {
Name string `json:"name"`
Schedule string `json:"schedule"`
}
// リクエストボディを構造体にバインド
reqb := cronworkflow{}
if err := c.Bind(&reqb); err != nil {
return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
}
cwClient := argoClientset.ArgoprojV1alpha1().CronWorkflows(namespace)
// CronWorkflow取得
cw, err := cwClient.Get(context.TODO(), reqb.Name, metav1.GetOptions{})
if err != nil {
return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
}
// スケジュール変更
cw.Spec.Schedule = reqb.Schedule
// 更新適用
resultCw, err := cwClient.Update(context.TODO(), cw, metav1.UpdateOptions{})
if err != nil {
return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
}
// 更新された内容をレスポンスにする
response := cronworkflow{
Name: resultCw.Name,
Schedule: resultCw.Spec.Schedule,
}
return c.JSON(http.StatusOK, response)
}
// 開発用。本番コードでは不要
func outClusterConfig() (*rest.Config, error) {
// Create client
kubeconfig, ok := os.LookupEnv("KUBECONFIG")
if !ok {
kubeconfig = filepath.Join(homedir.HomeDir(), ".kube", "config")
}
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}
func main() {
// Program argsでKubernetesのnamespaceを指定
if len(os.Args) != 2 {
panic("assert len(os.Args) == 2")
}
namespace = os.Args[1]
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
config, err = outClusterConfig() // 開発用。本番コードでは消す
if err != nil {
panic(err.Error())
}
}
// creates the clientset and argoClientset
clientset, err = kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
argoClientset, err = argoclientset.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// echoでAPIサーバー
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())
// Routing
api := e.Group("/" + namespace)
api.GET("/cronjobs", getCronjobs)
api.PUT("/cronjobs", updateCronjob)
api.GET("/cronworkflows", getCronworkflows)
api.PUT("/cronworkflows", updateCronworkflow)
e.Logger.Fatal(e.Start(":1323"))
}
go.mod文件
module admin_server
go 1.20
require (
github.com/argoproj/argo-workflows/v3 v3.4.11
github.com/labstack/echo/v4 v4.11.1
k8s.io/apimachinery v0.28.2
k8s.io/client-go v0.28.2
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.10.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/labstack/gommon v0.4.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/term v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230720185612-659f7aaaa771 // indirect
google.golang.org/grpc v1.56.2 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.28.2 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
Docker镜像文件
FROM golang:bullseye as builder
WORKDIR /app
COPY ./admin_server.go ./
COPY ./go.mod ./
RUN go mod tidy
RUN go build -o admin_server admin_server.go
FROM debian:bullseye
WORKDIR /app
COPY --from=builder /app/admin_server /app/
ENTRYPOINT ["/app/admin_server"]
Docker构建
docker build ./ -t admin-server
将Docker镜像注册到kind中
kind load docker-image admin-server --name=foo-cluster
创建用于API服务器的服务帐户。
为了在Pod中运行API服务器,需要创建一个服务账户。
已准备以下清单文件。
※ 服务账户的参考资源(谢谢你)
-
- https://qiita.com/knqyf263/items/ecc799650fe247dce9c5
- https://hyoublog.com/2022/05/02/kubernetes-serviceaccount/
apiVersion: v1
kind: ServiceAccount
metadata:
name: admin-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: admin-role
rules:
- apiGroups:
- batch
verbs:
- list
- get
- update
resources:
- cronjobs
- apiGroups:
- argoproj.io
verbs:
- list
- get
- update
resources:
- cronworkflows
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: admin-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: admin-role
subjects:
- kind: ServiceAccount
name: admin-sa
创建服务帐号。
kubectl -n bar apply -f admin-sa.yaml
创建用于API服务器的Service。
准备以下清单文件。
-
- 先ほど作成したサービスアカウント「admin-sa」を指定します。
-
- APIサーバー起動時のプログラム引数として、namespace(本記事では”bar”)を渡すように設定します。
- typeはNodePortにし、ポート8888でAPIサーバーに繋がるようにしました。
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: bar-admin-deployment
namespace: bar
labels:
app: bar-admin-server
spec:
replicas: 1
selector:
matchLabels:
app: bar-admin-server
template:
metadata:
labels:
app: bar-admin-server
spec:
serviceAccountName: admin-sa
containers:
- name: bar-admin-container
image: admin-server:latest
imagePullPolicy: Never
args: ["bar"]
---
apiVersion: v1
kind: Service
metadata:
name: bar-admin-nodeport
namespace: bar
labels:
app: bar-admin-server
spec:
type: NodePort
ports:
- port: 8888
targetPort: 1323
selector:
app: bar-admin-server
我会启动Service。
kubectl apply -f bar-admin-nodeport.yaml
确认启动:确保以bar-admin-deployment-开头的NAME的pod已经启动了。
kubectl -n bar get pod
服务的操作确认
暂时在这里进行API服务器的运行确认。
我将提供两种方法。
使用kubectl debug命令的方法
由于该文章的副本被设置为1,因此可以使用以下命令进行调试工作。
kubectl -n bar debug $(kubectl -n bar get --no-headers=true pods -l app=bar-admin-server -o custom-columns=:metadata.name) --image=curlimages/curl -it -- sh
我尝试调用四个 API。
curl http://bar-admin-nodeport.bar:8888/bar/cronjobs
curl http://bar-admin-nodeport.bar:8888/bar/cronworkflows
curl --location --request PUT 'http://bar-admin-nodeport.bar:8888/bar/cronjobs' --header 'Content-Type: application/json' --data '{"name": "hello", "schedule": "1 0 * * *"}'
curl --location --request PUT 'http://bar-admin-nodeport.bar:8888/bar/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "hello-world", "schedule": "2 0 * * *"}'
我确认了API能够正常运行。
使用 kubectl port-forward 命令的方式
kubectl -n bar port-forward svc/bar-admin-nodeport 30000:8888
我将尝试调用四个API。
curl http://localhost:30000/bar/cronjobs
curl http://localhost:30000/bar/cronworkflows
curl --location --request PUT 'http://localhost:30000/bar/cronjobs' --header 'Content-Type: application/json' --data '{"name": "hello", "schedule": "3 0 * * *"}'
curl --location --request PUT 'http://localhost:30000/bar/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "hello-world", "schedule": "4 0 * * *"}'
我已确认API正常运行。
创造了Ingress
为了将API公开给外部使用,我们使用Ingress。
下面准备了以下的清单文件。
- 本番ではhttpsで公開すべきですが、本記事ではhttpにしました。
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: bar-admin-ingress
namespace: bar
spec:
ingressClassName: "nginx"
rules:
- http:
paths:
- path: /bar
pathType: Prefix
backend:
service:
name: bar-admin-nodeport
port:
number: 8888
我要启动Ingress。
kubectl apply -f bar-admin-ingress.yaml
确认启动
kubectl -n bar get ingress
请确认Ingress的操作是否正常。
将”your_host”替换为机器的主机名,然后尝试调用4个API。
curl http://your_host/bar/cronjobs
curl http://your_host/bar/cronworkflows
curl --location --request PUT 'http://your_host/bar/cronjobs' --header 'Content-Type: application/json' --data '{"name": "hello", "schedule": "5 0 * * *"}'
curl --location --request PUT 'http://your_host/bar/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "hello-world", "schedule": "6 0 * * *"}'
我们确认了API能够正常运行。
解决问题的结果
删除最初创建的Kubernetes集群。
kind delete clusters foo-cluster