当在Azure Purview中检测到对特定类别的分类时,将发送通知
在这篇文章中要做的事情是什么?
准备好
请您注意
无论是通过 Teams、邮件或其他方式发送通知都可以,但这次我们选择使用 Slack。我们会事先创建一个通知用的频道,并准备好访问令牌。您可以参考这里的内容。
样本数据
本次测试数据使用了Starter Kit中的文件。我们使用包含信用卡号和电子邮件地址的数据进行分类,并进行相应的通知。
"Password","email","id","creditcard"
"b4c906d48f60cdf6a70801d9c9182793a478e1a3a2cb03542676a759db5f843f","ldorset7c@ucoz.com","5647518a-2a93-4611-9746-f050fd0a19d6","201907394460151"
"c8ee38f64609cdd6c8b80dcf888cc80d49fb25817e78dde5bc1094c42b4fa2d8","nweond2f@yale.edu","575d3815-3747-4aa8-94a6-3e32d9885d9c","5038685806838408"
"2d67e5d4eab9004f745c2eaff169be12b6be6a6816eac70b8497bcfefa092f08","fhowson1n@hhs.gov","ef678e73-0b90-4612-9ad0-00401c24b3c8","67616352246247000"
实施
Slack客户端的设置。
在预设中指定接入令牌和通知目标的频道ID。
from slack_sdk import WebClient
SLACK_ACCESS_TOKEN = 'アクセス トークン'
CHANNEL_ID = 'チャネルID'
# Slackクライアントを作成
slack_client = WebClient(SLACK_ACCESS_TOKEN)
使用EventHub的配置设置。
可以从Purview账户的属性中获取EventHub连接字符串,该属性名为”Atlas Kafka Endpoint的主要连接字符串”(Endpoint=sb://atlas-xxx…)。EVENTHUB_NAME固定为”atlas_entities”。
import sys
import asyncio
import nest_asyncio # Jupyter Notebookでのエラー対応
from pprint import *
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
# Connection strings
EVENTHUB_CONNECTION_STR = 'EventHubの接続文字列'
EVENTHUB_NAME = 'atlas_entities'
STORAGE_CONNECTION_STR = 'ストレージアカウントの接続文字列'
BLOB_CONTAINER_NAME = 'コンテナ名'
设置一个列表,用于判断从EventHub获取的信息中是否包含信用卡号码(MICROSOFT.FINANCIAL.CREDIT_CARD_NUMBER)和电子邮箱地址(MICROSOFT.PERSONAL.EMAIL)。支持的分类列表可在此处查看,正式名称为Purview Studio。
# 通知対象クラスの指定
class_list = ['MICROSOFT.FINANCIAL.CREDIT_CARD_NUMBER', 'MICROSOFT.PERSONAL.EMAIL']
接收信息并通知Slack
async def on_event(partition_context, event):
event_data = event.body_as_json()
eventType = event_data['message']['operationType']
# クラスが追加もしくは更新された場合
if eventType == 'CLASSIFICATION_ADD' or eventType == 'CLASSIFICATION_UPDATE':
classificationName = event_data['message']['entity']['classificationNames'][0]
# 通知対象のクラスにマッチした場合
if classificationName in class_list:
message = f"""\
通知対象の情報({classificationName})が検出されました。
name={event_data['message']['entity']['attributes']['name']}
qualifiedName={event_data['message']['entity']['attributes']['qualifiedName']}
"""
# Slackに通知
slack_client.chat_postMessage(channel=CHANNEL_ID, text=message, icon_emoji=':whale:', username='Purview Notice')
await partition_context.update_checkpoint(event)
async def receive(client):
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
async def main():
print("Starting stream...", file=sys.stderr)
checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
client = EventHubConsumerClient.from_connection_string(
EVENTHUB_CONNECTION_STR,
consumer_group="$Default",
eventhub_name=EVENTHUB_NAME,
checkpoint_store=checkpoint_store, # For load-balancing and checkpoint. Leave None for no load-balancing.
)
async with client:
await receive(client)
由于在Notebook中出现异步处理错误,所以我调用了nest_asyncio.apply()。
if __name__ == '__main__':
print("Python process started.", file=sys.stderr)
nest_asyncio.apply()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
请参照以下内容进行本土化的中文表达,只需提供一种选项:
这里有参考资料,包括来自EventHub的异步数据接收。