使用Amazon Managed Kafka (MSK)Data Streams和AWS Glue Streaming ETL对于将数据流式传送到Vantage进行了处理
以下的架构展示了从MSK流向AWS Glue,然后流向Teradata Vantage进行分析,并最终显示在Amazon QuickSight中的数据流。在这个教程中,我们将使用一个简单的Lambda函数来执行名为MSK Producer的任务。
前提条件
请确认使用AWS Glue Streaming ETL的前提条件,首先满足以下条件:在Teradata Vantage中。
为了登录虚拟机,您需要Amazon Elastic Compute Cloud(Amazon EC2)的密钥对。如果您还没有密钥对,我们将为您创建一个新的。以下是创建密钥对的步骤:我们将命名为Teradata.pem,并下载到本地机器上。
・创建Amazon QuickSight帐户(需要订阅)。
步骤
请在满足前提条件的基础上按照以下步骤进行任务操作:
-
- 订阅Teradata Vantage开发者版本。(这个步骤也适用于Vantage作为服务提供的情况)。
启动AWS CloudFormation堆栈,并部署Teradata Vantage和其他所需资源。
在Teradata Vantage中创建用户和可读/写数据库。
使用AWS Glue控制台创建MSK连接和表。
创建Glue流式ETL作业并开始流式处理。
使用Amazon QuickSight可视化加载到Teradata Vantage的数据。
进行清理操作。
步骤1:订阅Teradata Vantage开发版
请按照以下步骤订阅 Teradata Vantage Developer:
AWSアカウントにログインします。
AWS MarketplaceのTeradata Vantage Developer (Free, DIY)のリストにアクセスします。
右上の「Continue to Subscribe」を選択します。
「Accept Terms」を選択します。
选择即表示您同意条件,并可以在您的AWS Marketplace帐户中使用此软件。
步骤2:启动AWS CloudFormation Stack来部署Vantage。
AWS CloudFormation 提供了在云环境中建模和预配置 AWS 和第三方应用程序资源的通用语言。要部署 Vantage,请按照以下步骤进行:
点击“Launch Stack”按钮,将Teradata Vantage Developer Edition与这个教程所需的所有资源一起部署。
在CloudFormation控制台页面上,当输入了模板URL时,请从下拉菜单中选择AWS密钥对“Teradata.pem根据先决条件”。
由于其他参数已经自动输入,所以请向下滚动以确认创建IAM资源,并在复选框中打勾,然后点击“创建堆栈”。
Teradata Vantage Developer Edition现已具备所需的前提条件,包括MSK Cluster、MSK Client节点、Lambda函数和IAM角色,并部署到您的账户中。这可能需要最多20分钟完成。完成部署后,请转到Stack Output选项卡,并将其中列出的所有详细信息记录下来,这将在以后的步骤中需要使用。
步骤3:创建Kafka主题
完成前一部分后,使用.pem密钥登录到Kafka客户端节点,并执行以下步骤来创建完成此教程所需的Kafka主题。
要创建Kafka主题,您需要登录作为CloudFormation部署的一部分创建的Kafka客户端节点。使用以下命令连接到Kafka客户端节点的SSH。
ssh -i ~/Downloads/Teradata.pem ec2-user@: < KafkaClientInstance >
在终端上执行以下命令,在Kafka中创建名为“Say TeraTopic”的主题,并将ZookeeperConnectString替换为MSK集群的“ZK URL”。有关Zookeeper的URL,请参考“获取Zookeeper连接字符串”。
/opt/kafka/bin/kafka-topics.sh --create --zookeeper <ZookeeperConnectString> --replication-factor 1 --partitions 1 --topic TeraTopic
步骤4:使用Glue在MSK中创建连接和目录表。
在下面的步骤中,我们将解释如何设置以创建与 MSK 的连接,并创建一个目录表,用作 Glue 数据流 ETL 作业的源。
让我们创建一个Glue目录连接到MSK集群。
在服务AWS Glue Catalog Connection中导航到“添加连接”,点击“添加连接”。在连接属性页面中,将连接命名为“MSK_Connection”,连接类型设为“Kafka”,并输入运行在9094端口的MSK SSL引导URL。您可以在“获取引导经纪人信息”步骤中找到MSK经纪人URL。(或许使用AWS控制台会更方便。)
点击下一步。
在用于设置数据存储的“连接访问”屏幕上,从下拉菜单中选择名为“VANTAGE-VPC”的VPC。选择子网名称为“MMPrivateSubnetOne”,选择安全组名称前缀为“VantageStreamingSG”。
点击「下一步」,确认后点击「完成」来创建连接。
让我们创建一个MSK主题表。点击目录中的“表”,然后点击“添加表”。选择“手动添加表”。在下一个界面中输入名称“TeraTopic”。从下拉菜单中选择数据库。如果还没有创建数据库,请参考Glue数据库的使用方法创建数据库。
在数据存储页面上,将”Type of Source”设置为”Kafka”。”Topic Name”选择前面创建的”TeraTopic”,”Connection”选择”MSK_Connection”。点击”Next”继续进行下一步。
在下一页中,选择 Classification 为 ‘JSON’,然后点击下一步。在模式定义页面,点击 ‘添加列’,并指定以下类型的列名。
点击“下一步”以确认,然后在下一个界面点击“完成”,即可完成 MSK 表的创建。
步骤5:创建将数据从MSK流式传输到Vantage的Glue流ETL作业的方法。
您可以按照以下步骤将Teradata JDBC驱动程序下载后加载到Amazon S3的任意位置,然后在Glue流式ETL作业中使用它来连接Vantage数据库。
-
- 最新のTeradata JDBCドライバをダウンロードします。
-
- ダウンロードしたファイルからtdjdcb4.jarを解凍します。
-
- Amazon S3バケットを作成します。
- S3バケットにtdjdbc4.jarをアップロードします。
让我们从AWS Glue的ETL作业选项卡中创建流式ETL作业。点击左侧面板上的”Jobs”,然后点击”Add Job”按钮。
在接下来的一页上
-
- Name:MSK2Teradata
-
- IAM Role:ドロップダウンから TeradataGlueRole-MSK
-
- Type:Spark Streaming
-
- This Job Runs:Proposed Script generated by AWS Glue
- として指定します。他はデフォルトのまま、下にスクロールします。
在同一个窗口中选择“安全配置、脚本库和作业参数(可选)”,然后展开该部分。
在Dependent jars path字段中输入S3桶路径和Teradata JDBC驱动的键名。格式应为s3://<您的桶名>/terajdbc4.jar。
保留的参数保持默认值,滚动向下选择“下一步”。
显示数据源窗格。选择使用MSK主题创建的TeraTopic表的单选按钮,并点击”Next”。
在中国以及其他中国讲母语的地区,会显示数据目标面板。选择相同的TeraTopic并点击“下一步”。在这里,我们将使用脚本来更改目标。
在下一个窗口中,将显示源列和目标列之间的映射关系。无需进行任何更改。
点击「保存工作」并在脚本中进行以下更改:
- 34行目で、windowSize を100秒 から5秒 に変更します。
- 32行目で「datasink1=」の行を複製し、元のバージョンをコメントアウトします。これらの値を使用して、Teradata JDBCドライバの詳細を示す以下のスニペットを追加し、この中でvantage ip/hostnameを更新することを確認します。
datasink1 = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "jdbc", connection_options = {"url": "jdbc:teradata://ec2-XX-YYY-ZZZ-AA.us-west-2.compute.amazonaws.com/DATABASE=GlueDB,TMODE=ANSI","driver":"com.teradata.jdbc.TeraDriver","dbtable": "TeraTopic","database": "GlueDB","user": "GlueUser","password": "aws"}, transformation_ctx = "datasink1")
点击页面顶部的“保存”按钮。最后点击“运行作业”开始从MSK传输数据到Vantage。启动作业可能需要几分钟时间。
第六步:启动Lambda流模拟器 – MSK生产者。
现在请您回到CloudFormation资源页面,点击VantageMSKProducer物理ID的链接,启动Lambda控制台。
在Lambda控制台中,点击右上方的“Test”按钮,模拟流式数据。
在configure test event页面中,将会显示configure test event画面。在这个页面中,将提供一个格式化的JSON记录,用于模拟器执行的字段列表。将YOUR_MSK_BOOTSTRAP_SERVER_LIST替换为“Bootstrap Broker String TLS”,并指定测试事件的名称,然后点击“Save”创建测试事件。
{
"BOOTSTRAP_SERVERS": "<YOUR_MSK_BOOTSTRAP_SERVER_LIST>",
"TOPIC": "TeraTopic",
"BUCKET": "streaming-data-repo"
}
数据将被流式传输到存储桶中。点击保存按钮。保存完成后,再次点击测试按钮,启动模拟器并将数据流式传输到所设置的MSK主题。点击后,模拟器将运行2分钟,然后由于超时而出现错误。[超时可以通过Lambda设置在控制台上进行调整。为了避免资源消耗,流式传输被设置为停止]。
第七步:使用亚马逊QuickSight对数据进行可视化和分析。
可以对加载到Vantage中的数据应用各种分析。但是在这个例子中,我们将重点放在使用QuickSight来可视化加载到Vantage中的数据的方法。
首先打开Amazon QuickSight并创建新的数据集。
从数据集列表中选择“Teradata”。将出现一个弹出窗口。
-
- 「Database Name」フィールドに、データベースサーバーのVantageインスタンスのDNS 名と手順3で作成したデータベースのポート(1025)およびデータベース認証情報を入力します。
-
- 「Validate Connection」を選択してパラメータの正しさを確認します。接続が確立され検証されるとその横に緑色のチェックマークが表示されます。
- 同じポップアップウィンドウで、「Create Data Source」を選択します。
当数据源被创建后,Amazon QuickSight将会识别Vantage中的表格。
-
- TeraTopicテーブルを選択し、ポップアップウィンドウから「Use Custom SQL」を選択します。
-
- クエリの名前を指定しクエリとして「select * from TeraTopic」を入力し「Confirm Query」をクリックします。
- 読み込んだら「Edit/Preview data」をクリックします。以下のようにデータが読み込まれます。
- 日付フィールドのデータ型を必要に応じて変更するか計算フィールドを作成してQuickSightを使用してデータの可視化を開始することができます。
请参考本文档了解如何使用Amazon QuickSight创建AutoGraph的可视化。
步骤8:清理工作
请确保按照本文中的资源创建,不会产生额外费用,并删除AWS CloudFormation的堆栈。请转到CloudFormation控制台,然后在“Stacks”中删除创建的堆栈。同时停止已创建的Glue作业,并删除连接、数据库、表和创建的Glue作业。
最后
在本次博客中,我们学习了如何使用AWS Glue设置自定义数据库连接器,使用Streaming ETL将数据加载到Teradata Vantage中的MSK,以及使用Amazon QuickSight直接可视化结果的方法。请务必尝试!
请咨询有关Teradata Vantage的问题。
关于Teradata Vantage的查询