当在Azure Purview中检测到对特定类别的分类时,将发送通知

在这篇文章中要做的事情是什么?

image.png

准备好

请您注意

无论是通过 Teams、邮件或其他方式发送通知都可以,但这次我们选择使用 Slack。我们会事先创建一个通知用的频道,并准备好访问令牌。您可以参考这里的内容。

样本数据

本次测试数据使用了Starter Kit中的文件。我们使用包含信用卡号和电子邮件地址的数据进行分类,并进行相应的通知。

"Password","email","id","creditcard"
"b4c906d48f60cdf6a70801d9c9182793a478e1a3a2cb03542676a759db5f843f","ldorset7c@ucoz.com","5647518a-2a93-4611-9746-f050fd0a19d6","201907394460151"
"c8ee38f64609cdd6c8b80dcf888cc80d49fb25817e78dde5bc1094c42b4fa2d8","nweond2f@yale.edu","575d3815-3747-4aa8-94a6-3e32d9885d9c","5038685806838408"
"2d67e5d4eab9004f745c2eaff169be12b6be6a6816eac70b8497bcfefa092f08","fhowson1n@hhs.gov","ef678e73-0b90-4612-9ad0-00401c24b3c8","67616352246247000"

image.png

实施

Slack客户端的设置。

在预设中指定接入令牌和通知目标的频道ID。

from slack_sdk import WebClient
SLACK_ACCESS_TOKEN = 'アクセス トークン'
CHANNEL_ID = 'チャネルID'

# Slackクライアントを作成
slack_client = WebClient(SLACK_ACCESS_TOKEN)

使用EventHub的配置设置。

可以从Purview账户的属性中获取EventHub连接字符串,该属性名为”Atlas Kafka Endpoint的主要连接字符串”(Endpoint=sb://atlas-xxx…)。EVENTHUB_NAME固定为”atlas_entities”。

import sys
import asyncio
import nest_asyncio # Jupyter Notebookでのエラー対応
from pprint import *
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

# Connection strings
EVENTHUB_CONNECTION_STR = 'EventHubの接続文字列'
EVENTHUB_NAME = 'atlas_entities'
STORAGE_CONNECTION_STR = 'ストレージアカウントの接続文字列'
BLOB_CONTAINER_NAME = 'コンテナ名'

设置一个列表,用于判断从EventHub获取的信息中是否包含信用卡号码(MICROSOFT.FINANCIAL.CREDIT_CARD_NUMBER)和电子邮箱地址(MICROSOFT.PERSONAL.EMAIL)。支持的分类列表可在此处查看,正式名称为Purview Studio。

# 通知対象クラスの指定
class_list = ['MICROSOFT.FINANCIAL.CREDIT_CARD_NUMBER', 'MICROSOFT.PERSONAL.EMAIL']

接收信息并通知Slack

async def on_event(partition_context, event):
    event_data = event.body_as_json()
    eventType = event_data['message']['operationType']

    # クラスが追加もしくは更新された場合
    if eventType == 'CLASSIFICATION_ADD' or eventType == 'CLASSIFICATION_UPDATE':
        classificationName = event_data['message']['entity']['classificationNames'][0]
        # 通知対象のクラスにマッチした場合
        if classificationName in class_list:
            message = f"""\
通知対象の情報({classificationName})が検出されました。
  name={event_data['message']['entity']['attributes']['name']}
  qualifiedName={event_data['message']['entity']['attributes']['qualifiedName']}
            """

            # Slackに通知
            slack_client.chat_postMessage(channel=CHANNEL_ID, text=message, icon_emoji=':whale:', username='Purview Notice')

    await partition_context.update_checkpoint(event)
async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
async def main():
    print("Starting stream...", file=sys.stderr)
    checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
    client = EventHubConsumerClient.from_connection_string(
        EVENTHUB_CONNECTION_STR,
        consumer_group="$Default",
        eventhub_name=EVENTHUB_NAME,
        checkpoint_store=checkpoint_store,  # For load-balancing and checkpoint. Leave None for no load-balancing.
    )
    async with client:
        await receive(client)

由于在Notebook中出现异步处理错误,所以我调用了nest_asyncio.apply()。

if __name__ == '__main__':
    print("Python process started.", file=sys.stderr)
    nest_asyncio.apply()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

请参照以下内容进行本土化的中文表达,只需提供一种选项:

这里有参考资料,包括来自EventHub的异步数据接收。

广告
将在 10 秒后关闭
bannerAds