通过Terraform实现的Databricks工作流

通过Terraform实现Databricks工作流程 – Databricks博客的翻译。

该书为抄译版本,不能保证内容的准确性。请参考原文以获取准确的内容。

这是有关通过Terraform部署工作流程的博客系列的第1部分。我们将使用Terraform的基础架构即代码来介绍如何在Databricks上从零开始创建复杂的作业和工作流程。

通过Databricks工作流程的用户界面可以轻松直观地处理数据加工过程。选择代码,选择计算资源,定义任务之间的依赖关系,然后安排作业或工作流程的时间表。如有需要,还可以立即启动。就是这么简单。许多小团队经常对使用工作流程来构建数据工程管道和机器学习管道的速度感到惊讶。

然而,有一天,这些小团队开始变得壮大。而且,随着他们的壮大,他们的合作方式也需要进化。下面是他们可能遇到的新情景和挑战的示例:

継続的インテグレーション / 継続的デリバリー (CI/CD)

あるDatabricks環境から別の環境にどのようにジョブを複製するのか?
ワークフローが同期され続けていることをどのように保障するのか?特にこれはディザスターリカバリーのシナリオで重要となります。
ワークフローの設定が変更された際、全ての環境のレプリカに対して変更をどのようにロールアウトするのか?

アプリケーション開発とメンテナンス

開発サイクルを通じて、ワークフローをどのようにバージョン管理し、どのように変更を追跡するのか?
どのようにワークフローをテンプレートとして活用し、それからさらに複雑なワークフローをフォークするのか?
どのようにワークフローをモジュール化し、さまざまなチームが自身のパーツを持てるようにできるのか?

解决这些问题的方法是将工作流配置转换为 ‘代码’,并通过使用代码库进行版本控制。开发人员可以从代码库中创建分支或者派生来生成新的工作流(或者更新现有的工作流),然后通过CI/CD自动化进行部署。一旦充分模块化,不同的团队就可以同时使用不同的工作流模块进行工作。听起来很吸引人,但实际上 Workflow as Code 是什么呢?为了理解这一点,让我们首先看一下Databricks工作流的组成部分。

请注意,从历史上看,Job 在 Databricks 中被广泛使用,并成为一个即可用的 Orchestration 引擎。最近推出的 Workflow 功能进一步推动了 Job 的功能,并将其纳入了 Orchestration 工具的系列中。在 Workflow 下,您可以利用 Job、Delta Live Tables 管道的 Orchestration、高级通知功能、用于执行历史分析的仪表盘以及迅速增加的一系列功能。由于与先前的功能兼容,本文中 Workflow 和 Job 这两个关键词将被同样地使用。

Databricks工作流程

Tasks标签以非常优雅的方式显示任务之间的关系,但在实际操作中会涉及到各种调整和规定。对于那些需要与多个团队一起完成大规模工作的企业来说,高效管理这些调整和规定的需求非常重要。要了解此问题的程度,就需要理解内部工作流程的运作方式。

工作流由一个或多个任务组成,用于实现业务逻辑。每个任务需要访问代码。这些代码将在计算集群上执行。然后,集群需要有关Databricks运行时、实例类型和安装的库的详细信息。如果任务失败会发生什么?谁会受到通知?是否需要实现重试功能?此外,作业还需要元数据来指示Databricks如何启动。可以手动启动,也可以通过外部触发器(基于时间或事件)启动。还需要设置允许的同时运行数以及与管理者相关的权限设置。

工作流程的用户界面提供了一种直观且易于理解的方式来执行这些指示。但在许多团队中,他们可能需要管理版本并部署到多个环境的工作流程的代码版本。此外,他们还希望将此代码模块化,以便各个组件可以独立演化。例如,假设他们希望保留用于创建特定类型的集群的模块,例如”my_preferred_job_cluster_specifications”。在进行工作流程的调度时,他们只需指定对该规范对象的引用,而无需每次都明确指定集群配置的元数据。

解决方案是什么?欢迎来到基础设施即代码(IaC)和Terraform。

Terraform和IaC

在Databricks的基础设施中 – 是鸟还是飞机?

在Databricks的背景下,”cluster”(集群),”notebook”(笔记本),”workspace”(工作区)等术语实际上具有什么意义呢?从某种意义上说,这就是全部,也更甚于此。Databricks的对象,如用户、笔记本、作业、集群、工作区、仓库、秘钥等,在Terraform的术语中都被称为基础架构。更准确的术语应该是资源。Terraform Databricks提供商是一个插件,为Databricks提供了用于部署这些资源的模板。从Databricks自身的部署开始,通过这个插件可以实现几乎所有Databricks资源的部署和管理。以下是一个名为”shared_autoscaling”的资源示例,它指定了Databricks集群资源的HashiCorp语言(也称为Terraform语言)。本文展示了用于在AWS上部署基础架构的代码片段。

data "databricks_node_type" "smallest" {
  local_disk = true
}

data "databricks_spark_version" "latest_lts" {
  long_term_support = true
}

resource "databricks_cluster" "shared_autoscaling" {
  cluster_name            = "Shared Autoscaling"
  spark_version           = data.databricks_spark_version.latest_lts.id
  node_type_id            = data.databricks_node_type.smallest.id
  autotermination_minutes = 20
  autoscale {
    min_workers = 1
    max_workers = 50
  }
}

通过 Terraform 部署多任务作业资源

这些组件将被展开并部署在三个步骤中。

    1. 提供者的设置和Databricks的认证。

 

    1. 解决所有上游资源依赖关系,如笔记本、存储库、交互式群集、Git凭据和init脚本。

 

    创建临时作业群集、任务、依赖任务、通知详情、计划、重试策略等作业组件。

使用Databricks进行设置和认证

使用Terraform Databricks提供程序的第一步是将提供程序的二进制文件添加到项目的工作目录中。为此,您需要在工作目录中创建一个名为.tf的文件,其中包含以下内容(请从发布历史中选择您喜欢的提供程序版本),然后运行命令terraform init。

terraform {
  required_providers {
    databricks = {
      source = "databricks/databricks"
      version = "1.6.1" # provider version
    }
  }
}

为了使Terraform能够通过Databricks Workspace进行认证并进行基础设施的配置,需要将用于创建工作文件夹的令牌详细信息记录在.tf文件中。

provider "databricks" {
 host  = "https://my-databricks-workspace.cloud.databricks.com"
 token = "my-databricks-api-token"
}

有关创建Databricks API令牌的方法,请参阅此文档。有关其他认证设置方法,请参阅此处。请注意,不推荐在明文中硬编码凭据。这仅用于演示目的。强烈建议使用支持加密的Terraform后端。您可以使用环境变量,~/.databrickscfg文件,加密的.tfvars文件或您喜欢的秘密存储(Hashicorp Vault、AWS Secrets Manager、AWS Param Store、Azure Key Vault)。

上游资源依赖关系的部署

通过下载Databricks提供程序的二进制文件并设置令牌文件,Terraform可以将资源部署到令牌文件中指定的工作区。重要的是,需要配置所有与作业相关的资源。

    ジョブに含まれるタスクがインタラクティブクラスターを使用している場合、最初にクラスターをデプロイする必要があります。これによって、ジョブのTerraformコードはインタラクティブクラスターのidを取得し、引数existing_cluster_idに引き渡せるようになります。
data "databricks_current_user" "me" {}
data "databricks_spark_version" "latest" {}
data "databricks_spark_version" "latest_lts" {
 long_term_support = true
}
data "databricks_node_type" "smallest" {   
 local_disk = true
}

# create interactive cluster
resource "databricks_cluster" "my_interactive_cluster" {
 cluster_name            = "my_favorite_interactive_cluster"
 spark_version           = data.databricks_spark_version.latest_lts.id
 node_type_id            = data.databricks_node_type.smallest.id
 autotermination_minutes = 20
 autoscale {
   min_workers = 1
   max_workers = 2
 }
}
# create a multi-task job
resource "databricks_job" "my_mtj" {
 name = "Job with multiple tasks"
   task {
       # arguments to create a task
      
       # reference the pre-created cluster here
       existing_cluster_id = "${databricks_cluster.my_interactive_cluster.id}"

   }
}
    ジョブのタスクがワークスペースやDatabricks Repoのコードを使用している場合、最初にノートブックやRepoをデプロイする必要があります。Repoやノートブックは、IAM(Identity and Access Management)やGit資格情報などの上流の依存関係を持っている場合があることに注意してください。事前にこれらをプロビジョンしてください。
data "databricks_current_user" "me" { } 

# notebook will be copied from local path
# and provisioned in the path provided
# inside Databricks Workspace
resource "databricks_notebook" "my_notebook" { 
  source = "${path.module}/my_notebook.py" 
  path = "${data.databricks_current_user.me.home}/AA/BB/CC" 
}

クラスターポリシー 、インスタンスプール、Delta Live Tablesパイプラインは上流のリソース依存関係です。これらを使用している場合には、事前に解決しておく必要があります。

部署工作组件

从创建databricks_job资源的容器开始。请注意设置了诸如调度、最大同时运行数等作业级参数。

resource "databricks_job" "name_of_my_job" {
 name = "my_multi_task_job"
 max_concurrent_runs = 1

 # job schedule
 schedule {
   quartz_cron_expression = "0 0 0 ? 1/1 * *" # cron schedule of job
   timezone_id = "UTC"
  }

 # notifications at job level
 email_notifications {
   on_success = ["111@abc.com", "222@abc.com"]
     on_start   = ["222@abc.com"]
     on_failure = ["my_distribution_list@abc.com"]
 }

 # reference to git repo. Add the git credential separately
 # through a databricks_git_credential resource
 git_source {
   url      = "https://github.com/udaysat-db/test-repo.git"
   provider = "gitHub"
   branch   = "main"
 }

 # Create blocks for Jobs Clusters here #

 # Create blocks for Tasks here #
}

在接下来的步骤中,我们会创建一个与此任务寿命相关的临时作业集群,即作业集群。与此同时,互动集群会在事先创建,在作业范围之外共享资源。

# this ephemeral cluster can be shared among tasks
# stack as many job_cluster blocks as you need
 job_cluster {
   new_cluster {
     spark_version = "10.4.x-scala2.12"
     spark_env_vars = {
       PYSPARK_PYTHON = "/databricks/python3/bin/python3"
     }
     num_workers        = 8
     data_security_mode = "NONE"
     aws_attributes {
       zone_id                = "us-west-2a"
       spot_bid_price_percent = 100
       first_on_demand        = 1
       availability           = "SPOT_WITH_FALLBACK"
     }
   }
   job_cluster_key = "Shared_job_cluster"
 }

让我们来创建一个任务块。这个任务块要使用工作区的笔记本和之前定义的共享作业集群。请注意如何使用base_parameters来提供任务的输入参数。

task {
   task_key = "name_of_my_first_task" # this task depends on nothing

   notebook_task {
     notebook_path = "path/to/notebook/in/Databricks/Workspace" # workspace notebook
   }

   job_cluster_key = "Shared_job_cluster" # use ephemeral cluster created above

   # input parameters passed into the task
   base_parameters = {
       my_bool   = "True"
       my_number = "1"
       my_text   = "hello"
     }

   # notifications at task level
   email_notifications {
     on_success = ["111@abc.com", "222@abc.com"]
     on_start   = ["222@abc.com"]
     on_failure = ["my_distribution_list@abc.com"]
   }
 }

这是一个指向(Job容器定义的)远程git存储库的任务。关于计算资源,此任务将使用交互式集群。请注意pip库的使用方法以及超时和重试设置。

task {
   task_key = "name_of_my_git_task" # reference git repo code

   notebook_task {
     notebook_path = "nb-1.py" # relative to git root
   }

   existing_cluster_id = "id_of_my_interactive_cluster" # use a pre existing cluster

   # you can stack multiple depends_on blocks
   depends_on {
     task_key = "name_of_my_first_task"
   }

   # libraries needed
   library {
     pypi {
       package = "faker"
     }
   }

   # timeout and retries
   timeout_seconds = 1000
   min_retry_interval_millis = 900000
   max_retries = 1
 }

最后,这是使用Delta Live Tables管道的任务块。需要单独创建此管道。

task {
   task_key = "dlt-pipeline-task"
  
   pipeline_task {
     pipeline_id = "id_of_my_dlt_pipeline"
   }
  
   # depends on multiple tasks
   depends_on {
     task_key = "name_of_my_first_task"
   }
   depends_on {
     task_key = "name_of_my_git_task"
   }
 }

任务类型,集群类型和其他属性的顺序和组合是无限的。但是,希望上述模式能够帮助你合理构建复杂的多任务作业/工作流程,利用这些构建模块。一旦编写了Terraform代码,您可以使用以下命令操作资源。

コマンド説明terraform init他のコマンドを使用するために作業ディレクトリを準備します。terraform validate設定が妥当かどうかをチェックします。terraform plan現在の設定に必要な変更点を表示します。terraform applyインフラストラクチャを作成、更新します。terraform destroy以前作成したインフラストラクチャを破壊します。

总结

Terraform是一种强大的基础设施即代码工具,可用于在Databricks上部署资源。通过连接大量资源来构建多任务工作流,为团队提供了在创建作业、任务和群集的模块化模板时非常灵活的能力。这些模板可进行版本管理、共享和重复使用,可以快速在企业内部部署这些模板。如上文所述,习惯于使用Terraform的开发人员可以轻松地从头开始创建工作流,但数据工程师和数据科学家可能仍然更喜欢在用户界面上创建工作流。在这种情况下,Terraform开发人员可以继承已创建的工作流。那么继承的工作流是什么样的?它可以进一步被重复使用和演化吗?在本系列文章的下一篇中讨论这些情景。

开始使用

学习使用Terraform
Databricks Terraform提供程序

Databricks免费试用

DataBricks 免费试用

广告
将在 10 秒后关闭
bannerAds