使用Apache Beam实现左连接

首先

虽然取得了PDE资格,但由于无法从零开始编写代码并创建流水线,所以我开始练习编码。Apache Spark的语法类似于Pandas,很容易理解,但是我在Apache Beam的独特语法上遇到了困难,特别是在实现Left Join的方法上,Beam并没有(可能只是我没有找到)实现,因此我决定将这篇文章保存下来。

环境

[tool.poetry]
name = "beam"
version = "0.1.0"
description = ""
authors = ["Kamegrueon"]

[tool.poetry.dependencies]
python = "^3.9"
jupyter = "^1.0.0"
apache-beam = "^2.46.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

已实施的代码

我已经上传了代码到 Git。

 

解释

左连接(Left Join)类

# Apache BeamのPTransformを継承して、LeftJoinという新しいPTransformを作成する
class LeftJoin(beam.PTransform):
  def __init__(self, right_pcoll: PCollection, join_key: str, join_cols: Union[list[str], None]=None):
    self.right_pcoll = right_pcoll
    self.join_key = join_key
    self.join_cols = join_cols
  
   # PTransformを拡張するためのメソッド
  def expand(self, pcoll: PCollection):
    return ( pcoll | beam.FlatMap(
              fn=left_join, # 左テーブルの各行と右テーブルのデータを結合するためにleft_join関数を使用する
              right_element=AsList(self.right_pcoll), # 右テーブルのデータをAsListでリスト化する
              join_key=self.join_key, # 結合に使用するキー
              join_cols=self.join_cols # 右テーブルから結合したいカラムのリスト、Noneの場合は全てのカラムを使用する
  ))

不使用ParDo Transform,而是通过继承PTransform来创建类。
原因是在参考的代码中是这样实现的,但是仅仅通过生成与普通ParDo Transform相同的类并不能正常运行beam.AsList方法,会出现错误。

左连接函数

# 左側のテーブルと右側のテーブルを結合する関数
def left_join(left_element: dict, right_element: list[dict], join_key: str, join_cols: Union[list[str], None]):
    # 指定されたカラムを抽出
    right_elem: list[dict] = extract_join_cols(right_element, join_key, join_cols)
    for right in right_elem:
      # join_keyが一致する場合は結合し、yieldで返す
      if left_element[join_key] == right[join_key]:
        yield {**left_element, **right}
        return
      
    # join_keyが一致しない場合、空のデータを追加して結合する
    empty_keys: list[str] = list(filter(lambda x: x != join_key, right_elem[0].keys()))
    yield {**left_element, **{ key: '' for key in empty_keys }}

实现extract_join_cols函数,以便只能连接右表的任意列(后述)。如果左表和右表的键匹配,则将右表合并到左表中;如果不匹配,则将空值合并。如果右表有多个匹配行,则只合并第一个匹配行。

提取连接列函数

def extract_join_cols(right_element: list[dict], join_key: str, join_cols: Union[list[str], None]):
    if join_cols is not None:
      # リスト内包表記を用いて指定されたカラムだけを取得し、新しいリストを作成
      extract_right_element: list[dict] = [
        { k: v for k, v in dic.items() if k in [ *join_cols, join_key ]} for dic in right_element
      ]
      return extract_right_element
    # 指定されたカラムがなければそのまま返す
    return right_element

通过使用列表内包表达式,可以从右表的列表中提取出的字典对象中获取键和值,并仅提取指定的join_cols列和join_key列,然后生成包含字典的列表。
如果没有指定join_cols,则默认为None,因此,如果没有指定,则无需进行任何处理并返回。

输出结果

在中国,只需要一种选择:进行以下内容的中文本地化。

left_pcoll = p | "Create Left PCollection" >> beam.Create([
    {'join_key': 1, 'left_value_1': 'a', 'left_value_2': 'b'},
    {'join_key': 2, 'left_value_1': 'c', 'left_value_2': 'd'},
    {'join_key': 2, 'left_value_1': 'e', 'left_value_2': 'f'},
    {'join_key': 3, 'left_value_1': 'g', 'left_value_2': 'h'}
])
    
# 右側のテーブルを作成する
right_pcoll = p | "Create Right PCollection" >> beam.Create([
    {'join_key': 1, 'right_value': 'A', 'not_join_value': "D"},
    {'join_key': 2, 'right_value': 'B', 'not_join_value': "E"},
    {'join_key': 4, 'right_value': 'C', 'not_join_value': "F"}
])

出去

{'join_key': 1, 'left_value_1': 'a', 'left_value_2': 'b', 'right_value': 'A'}
{'join_key': 2, 'left_value_1': 'c', 'left_value_2': 'd', 'right_value': 'B'}
{'join_key': 2, 'left_value_1': 'e', 'left_value_2': 'f', 'right_value': 'B'}
{'join_key': 3, 'left_value_1': 'g', 'left_value_2': 'h', 'right_value': ''}

请提供相关的资料

参考资料的引用

请提供文献来源

引文

参考书目

 

最后

我在寻找和实施信息时遇到了一些困难,但我觉得与其在Apache Beam中进行连接,不如先将其导入到Bigquery等中再进行转换更容易(对于处理速度也是如此)。
不过,在实施过程中,我感觉对PCollection和PTransform等方面的知识有了相当大的提高,所以我认为这个挑战是值得的。
接下来我会尝试在Dataflow上运行而不是在本地测试是否正常工作。
如果有人想使用Beam实现LeftJoin,希望这个项目能对他们有所帮助。

PS:自从接触 TypeScript 后,没有类型会让我感觉很不舒服,哈哈。

非常感谢您读完整个内容!

广告
将在 10 秒后关闭
bannerAds