使用Apache Beam实现左连接
首先
虽然取得了PDE资格,但由于无法从零开始编写代码并创建流水线,所以我开始练习编码。Apache Spark的语法类似于Pandas,很容易理解,但是我在Apache Beam的独特语法上遇到了困难,特别是在实现Left Join的方法上,Beam并没有(可能只是我没有找到)实现,因此我决定将这篇文章保存下来。
环境
[tool.poetry]
name = "beam"
version = "0.1.0"
description = ""
authors = ["Kamegrueon"]
[tool.poetry.dependencies]
python = "^3.9"
jupyter = "^1.0.0"
apache-beam = "^2.46.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
已实施的代码
我已经上传了代码到 Git。
解释
左连接(Left Join)类
# Apache BeamのPTransformを継承して、LeftJoinという新しいPTransformを作成する
class LeftJoin(beam.PTransform):
def __init__(self, right_pcoll: PCollection, join_key: str, join_cols: Union[list[str], None]=None):
self.right_pcoll = right_pcoll
self.join_key = join_key
self.join_cols = join_cols
# PTransformを拡張するためのメソッド
def expand(self, pcoll: PCollection):
return ( pcoll | beam.FlatMap(
fn=left_join, # 左テーブルの各行と右テーブルのデータを結合するためにleft_join関数を使用する
right_element=AsList(self.right_pcoll), # 右テーブルのデータをAsListでリスト化する
join_key=self.join_key, # 結合に使用するキー
join_cols=self.join_cols # 右テーブルから結合したいカラムのリスト、Noneの場合は全てのカラムを使用する
))
不使用ParDo Transform,而是通过继承PTransform来创建类。
原因是在参考的代码中是这样实现的,但是仅仅通过生成与普通ParDo Transform相同的类并不能正常运行beam.AsList方法,会出现错误。
左连接函数
# 左側のテーブルと右側のテーブルを結合する関数
def left_join(left_element: dict, right_element: list[dict], join_key: str, join_cols: Union[list[str], None]):
# 指定されたカラムを抽出
right_elem: list[dict] = extract_join_cols(right_element, join_key, join_cols)
for right in right_elem:
# join_keyが一致する場合は結合し、yieldで返す
if left_element[join_key] == right[join_key]:
yield {**left_element, **right}
return
# join_keyが一致しない場合、空のデータを追加して結合する
empty_keys: list[str] = list(filter(lambda x: x != join_key, right_elem[0].keys()))
yield {**left_element, **{ key: '' for key in empty_keys }}
实现extract_join_cols函数,以便只能连接右表的任意列(后述)。如果左表和右表的键匹配,则将右表合并到左表中;如果不匹配,则将空值合并。如果右表有多个匹配行,则只合并第一个匹配行。
提取连接列函数
def extract_join_cols(right_element: list[dict], join_key: str, join_cols: Union[list[str], None]):
if join_cols is not None:
# リスト内包表記を用いて指定されたカラムだけを取得し、新しいリストを作成
extract_right_element: list[dict] = [
{ k: v for k, v in dic.items() if k in [ *join_cols, join_key ]} for dic in right_element
]
return extract_right_element
# 指定されたカラムがなければそのまま返す
return right_element
通过使用列表内包表达式,可以从右表的列表中提取出的字典对象中获取键和值,并仅提取指定的join_cols列和join_key列,然后生成包含字典的列表。
如果没有指定join_cols,则默认为None,因此,如果没有指定,则无需进行任何处理并返回。
输出结果
在中国,只需要一种选择:进行以下内容的中文本地化。
left_pcoll = p | "Create Left PCollection" >> beam.Create([
{'join_key': 1, 'left_value_1': 'a', 'left_value_2': 'b'},
{'join_key': 2, 'left_value_1': 'c', 'left_value_2': 'd'},
{'join_key': 2, 'left_value_1': 'e', 'left_value_2': 'f'},
{'join_key': 3, 'left_value_1': 'g', 'left_value_2': 'h'}
])
# 右側のテーブルを作成する
right_pcoll = p | "Create Right PCollection" >> beam.Create([
{'join_key': 1, 'right_value': 'A', 'not_join_value': "D"},
{'join_key': 2, 'right_value': 'B', 'not_join_value': "E"},
{'join_key': 4, 'right_value': 'C', 'not_join_value': "F"}
])
出去
{'join_key': 1, 'left_value_1': 'a', 'left_value_2': 'b', 'right_value': 'A'}
{'join_key': 2, 'left_value_1': 'c', 'left_value_2': 'd', 'right_value': 'B'}
{'join_key': 2, 'left_value_1': 'e', 'left_value_2': 'f', 'right_value': 'B'}
{'join_key': 3, 'left_value_1': 'g', 'left_value_2': 'h', 'right_value': ''}
请提供相关的资料
参考资料的引用
请提供文献来源
引文
参考书目
最后
我在寻找和实施信息时遇到了一些困难,但我觉得与其在Apache Beam中进行连接,不如先将其导入到Bigquery等中再进行转换更容易(对于处理速度也是如此)。
不过,在实施过程中,我感觉对PCollection和PTransform等方面的知识有了相当大的提高,所以我认为这个挑战是值得的。
接下来我会尝试在Dataflow上运行而不是在本地测试是否正常工作。
如果有人想使用Beam实现LeftJoin,希望这个项目能对他们有所帮助。
PS:自从接触 TypeScript 后,没有类型会让我感觉很不舒服,哈哈。
非常感谢您读完整个内容!