阅读Dapr状态查询的实现(Visitor模式示例)

太长不看 bú

    • DaprのStateは、様々なDB実装(MongoDB, MySQL, Redis…)を抽象化し統一的な操作ができるようにしている

 

    • State Queryによって条件に応じたデータの一覧取得が可能

 

    • State Query内部では、Visitorパターンを使いDaprのクエリを各実装(DB)のクエリに変換

 

    →将来Stateに新しい実装が追加されても実装修正が最小限で済む!

首先

新年快乐。

Dapr以“组件”的形式对持久化和PubSub等机制进行了抽象化,使得中间件可以无需意识到其具体是什么而直接使用。

 

一方、永続化機構「State」ではDBが抽象化される引き換えに、長らく単純な取得、更新(レコードのIDの指定が必須)しかできませんでした。

しかし、v1.5.0からStateでもクエリが使えるようになり、ついに条件式での絞り込みによる一覧取得ができるようになりました!

 

利用者としてはState活用の幅が広がって願ったりかなったりです。一方、Dapr側の実装を考えると、Stateの条件式を各DB固有のクエリ文に変換しなければならないので大変そうです…

気になったのでコードを読んでみたところ、Visitorパターンを使ってクエリ変換の複雑さをなるべく軽減させる工夫がなされていました。

本記事では、Visitorパターンの実例として、このState Queryの実装を紹介したいと思います。

版本

    Dapr v1.5.1

由于查询状态功能目前处于α版本阶段,可能会有较大的变动。请您提前知悉,由于此文章的内容可能会变得过时,请谅解。

State対応状況

 

目前仅有1.5.1版本可供使用。

    • MongoDB

 

    Azure CosmosDB

只需要一种选择。

关于Dapr状态查询

実装の話に入る前に、Dapr State Queryの機能を軽く見ていきます。
公式ガイドに試す方法が載っているので、そのやり方に従います。

 

准备

# ファイルの準備(↑のサイトからダウンロード)
$ tree
.
└── query-api-examples
    ├── components
    │   └── mongodb.yml
    └── dataset.json

2 directories, 2 files

# daprdの起動
$ dapr init
$ dapr -v
CLI version: 1.5.1 
Runtime version: 1.5.1

# MongoDBとデモアプリを起動
$ docker run -d --rm -p 27017:27017 --name mongodb mongo:5
$ dapr run --app-id demo --dapr-http-port 3500 --components-path query-api-examples/components

# サンプルデータをstateに保存
$ curl -X POST -H "Content-Type: application/json" -d @query-api-examples/dataset.json http://localhost:3500/v1.0/state/statestore

国家查询请求

当在过滤器中写入条件时,可以批量检索与条件匹配的数据。要指定目标键,可以使用类似于jsonpath的路径格式。

(2022/4/16更新:以前写作时提到了「类似jq的路径格式」,但在参考资料中发现了「类似jsonpath的格式」的说明,因此进行了修正。)

// data中 ".state" の値が "CA" に等しいものを取得
$ curl -s -X POST -H "Content-Type: application/json" -d '{"query": {"filter": {"EQ": {"value.state": "CA"}}}}' http://localhost:3500/v1.0-alpha1/state/statestore/query | jq .
{
  "results": [
    {
      "key": "3",
      "data": {
        "city": "Sacramento",
        "state": "CA",
        "person": {
          "id": {
            "$numberDouble": "1071.0"
          },
          "org": "Finance"
        }
      },
      "etag": "26838a01-da42-4663-85cb-2e0c09ba6a30"
    },
    {
      "key": "5",
      "data": {
        "person": {
          "org": "Hardware",
          "id": {
            "$numberDouble": "1007.0"
          }
        },
        "city": "Los Angeles",
        "state": "CA"
      },
      "etag": "b5ae26a9-fa38-4298-9e8c-7be68168ae8e"
    },
    {
      "key": "7",
      "data": {
        "person": {
          "org": "Dev Ops",
          "id": {
            "$numberDouble": "1015.0"
          }
        },
        "city": "San Francisco",
        "state": "CA"
      },
      "etag": "8489b9e7-ba4a-411e-9ede-f9db09cf848a"
    },
    {
      "key": "9",
      "data": {
        "person": {
          "id": {
            "$numberDouble": "1002.0"
          },
          "org": "Finance"
        },
        "city": "San Diego",
        "state": "CA"
      },
      "etag": "157cdf03-cf20-42f4-9d52-22672cce26de"
    }
  ]
}

以下是可用于过滤器的四种类型。

フィルタ効果例EQ指定したキーの値が指定した値と等しい{"EQ": {"value.foo": "bar"}}IN指定したキーの値が指定した配列に含まれる{"IN": {"value.foo": ["bar", "baz"]}}ANDフィルタを全て満たす{"AND": [{"EQ": ...}, {"IN": ...}]}ORフィルタを1つでも満たす{"OR": [{"EQ": ...}, {"IN": ...}]}

 

如果未指定查询过滤器,将获取State的所有记录。

$ curl -s -X POST -H "Content-Type: application/json" -d '{"query": {}}' http://localhost:3500/v1.0-alpha1/state/statestore/query  | jq .
{
  "results": [
    {
      "key": "1",
      "data": {
        "person": {
          "org": "Dev Ops",
          "id": {
            "$numberDouble": "1036.0"
          }
        },
        "city": "Seattle",
        "state": "WA"
      },
      "etag": "00858945-c8bb-48d0-811f-66c0fa1f0780"
    },
    ...
  ]
}

你是如何实现的?

“Dapr State 的查询不论实现如何,尽管形式相同,但内部肯定需要将其转换为各个数据库实现的查询。它是如何实现的呢?”

使用访问者模式进行设计

通过使用访问者模式,可以实现一个设计,仅需替换”将状态查询转换为组件实现(如MongoDB等)的查询”这部分。

 

由于输入和生成物都具有“查询”这个术语,所以以下将明确区分Dapr状态查询和组件实现查询。

dapr_state.drawio.png

设计的主要角色如下所示。

设计的主要人物包括以下人员。

Querier: Dapr state query APIのハンドラ(リクエストを処理しレスポンスを返す)

Builder: Dapr stateクエリからComponent実装のクエリを生成

Visitor: Dapr stateクエリの要素に応じた、Component実装のクエリ片を生成

Query: リクエストされたDapr stateクエリ

由于Dapr状态查询的数据结构查询和实现相关的查询转换处理程序被分离,因此只需增加Visitor的实现即可,而不需要对其他部分进行更改,即使添加了组件。

让我们来看一下实际处理的流程。

Dapr state query APIのリクエストを受け取る

Querierの実装*mongodb.MongoDBは、リクエスト中のDapr stateクエリをBuilderに渡し、MongoDBのクエリに変換します。
その後、変換されたクエリを使ってMongoDBにリクエストします。

func (m *MongoDB) Query(req *state.QueryRequest) (*state.QueryResponse, error) {
	q := &Query{} // MongoDBのクエリ
	qbuilder := query.NewQueryBuilder(q)
	// dapr stateのクエリ req.Query からMongoDBのクエリを生成し、 q に書き込み
	if err := qbuilder.BuildQuery(&req.Query); err != nil {
		return &state.QueryResponse{}, err
	}
	// MongoDBにリクエスト
	data, token, err := q.execute(ctx, m.collection)
	if err != nil {
		return &state.QueryResponse{}, err
	}

	return &state.QueryResponse{
		Results: data,
		Token:   token,
	}, nil
}

BuilderがDapr stateクエリをMongoDBのクエリに変換

建造者将按照以下的两个步骤来转换查询。

    • Dapr stateのフィルタをComponent実装のクエリ文字列に変換

MongoDBの場合、クエリ文字列は “{\”foo\”: \”bar\”}” のような形式

上記のクエリ文字列からComponentにリクエストするデータ構造に変換

MongoDBの場合、BSON形式に変換され Visitor 内部で保持される

func (h *Builder) BuildQuery(q *Query) error {
	// Dapr stateのフィルタからcomponent実装のフィルタ(を表すクエリ文字列)を生成
	filters, err := h.buildFilter(q.Filter)
	if err != nil {
		return err
	}

	// 上記で生成したフィルタ文字列をもとに、component実装のクエリを生成
	return h.visitor.Finalize(filters, q)
}

生成クエリ文字列の処理は、Componentの実装ではなく、Visitorに委託します。

    • Builder: Dapr Stateクエリの内部構造を提供(全Component実装で共通の部分)

 

    Visitor: Dapr Stateクエリの各要素をComponent実装のクエリの形式に変換(Component実装ごとに異なる)

能够保持松散耦合。

func (h *Builder) buildFilter(filter Filter) (string, error) {
	if filter == nil {
		return "", nil
	}
  // Builderは「このフィルタが来たらこのメソッドを呼ぶ」というところまでしか管理しない。実際の文字列生成処理はVisitorの実装が担う。
	switch f := filter.(type) {
	case *EQ:
		return h.visitor.VisitEQ(f)
	case *IN:
		return h.visitor.VisitIN(f)
	case *OR:
		return h.visitor.VisitOR(f)
	case *AND:
		return h.visitor.VisitAND(f)
	default:
		return "", fmt.Errorf("unsupported filter type %#v", filter)
	}
}

在Dapr状态查询的过滤器中,Visitor的实现*mongodb.Query会生成相应的MongoDB查询片段。

func (q *Query) VisitEQ(f *query.EQ) (string, error) {
	// { <key>: <val> }
	return fmt.Sprintf("{ %q: %q }", f.Key, f.Val), nil
}

func (q *Query) VisitIN(f *query.IN) (string, error) {
	// { $in: [ <val1>, <val2>, ... , <valN> ] }
	if len(f.Vals) == 0 {
		return "", fmt.Errorf("empty IN operator for key %q", f.Key)
	}
	str := fmt.Sprintf(`{ %q: { "$in": [ %q`, f.Key, f.Vals[0])
	for _, v := range f.Vals[1:] {
		str += fmt.Sprintf(", %q", v)
	}
	str += " ] } }"

	return str, nil
}

// VisitAnd, VisitOrも同様 (再帰処理が入り長いので略)

游客生成的过滤器字符串将在Finalize时转变为完整的BSON查询。
转换完成的查询将保存在q.filter中,在Visitor中进行管理。

func (q *Query) Finalize(filters string, qq *query.Query) error {
	q.query = filters
	if len(filters) == 0 {
		q.filter = bson.D{}
	} else if err := bson.UnmarshalExtJSON([]byte(filters), false, &q.filter); err != nil {
		return err
	}
	q.opts = options.Find()

	// Dapr state queryのソートとページネーションもBSONに変換し追加
	// ...

	return nil
}

向MongoDB发送MongoDB查询请求

最后,Dapr状态查询API处理程序将委托给Visitor向组件实现发出请求。

再次提到

func (m *MongoDB) Query(req *state.QueryRequest) (*state.QueryResponse, error) {
	q := &Query{}
	qbuilder := query.NewQueryBuilder(q)
	if err := qbuilder.BuildQuery(&req.Query); err != nil {
		return &state.QueryResponse{}, err
	}
	// ★ MongoDBにリクエスト
	data, token, err := q.execute(ctx, m.collection)
	if err != nil {
		return &state.QueryResponse{}, err
	}

	return &state.QueryResponse{
		Results: data,
		Token:   token,
	}, nil
}

访问者将使用先前创建的BSON查询(q.filter)向MongoDB发送请求。

func (q *Query) execute(ctx context.Context, collection *mongo.Collection) ([]state.QueryItem, string, error) {
	cur, err := collection.Find(ctx, q.filter, []*options.FindOptions{q.opts}...)
	if err != nil {
		return nil, "", err
	}
	// レスポンスの詰め替え処理
	// ...

	return ret, token, nil
}

我认为execute非公开是因为参数是实现相关性的缘故1。

最后,这个返回值将作为Dapr状态查询API的响应返回。辛苦了。

最后

以前从未使用过访问者模式,我一直在思考它是否真的有用。但是当我看到实际的例子后,我恍然大悟。
当我再次回顾《Java语言学习 设计模式入门》时,第2章的内容变得更加清晰明了。

添加新的ConcreteVisitor角色很容易。具体的处理可以交给ConcreteVisitor角色,而且不需要修改任何ConcreteElement角色来进行处理。

确实写得很好。

由于Dapr组件是抽象化的地方,因此可能还使用了其他各种设计模式。如果有机会的话,我希望能够阅读其他的实现部分。

实际上,CosmosDB的execute函数的签名是不同的。
顺便提一句,我手上的是一份旧版本。
广告
将在 10 秒后关闭
bannerAds