和风网标志

使用 Amazon Comprehend 和 Amazon Kinesis Data Firehose 近乎实时地编辑来自流数据的敏感数据

日期:

数据和洞察力的近实时交付使企业能够快速响应客户的需求。 实时数据可以来自多种来源,包括社交媒体、物联网设备、基础设施监控、呼叫中心监控等。 由于从多个来源摄取数据的广度和深度,企业寻找解决方案来保护客户的隐私并防止敏感数据从终端系统被访问。 您以前必须依赖可能标记误报或丢失数据的个人身份信息 (PII) 规则引擎,或者您必须构建和维护自定义机器学习 (ML) 模型来识别流数据中的 PII。 您还需要实施和维护支持这些引擎或模型所需的基础设施。

为了帮助简化此过程并降低成本,您可以使用 亚马逊领悟,一种自然语言处理 (NLP) 服务,它使用 ML 在非结构化文本中查找洞察力和关系,例如人物、地点、情绪和主题。 您现在可以使用 Amazon Comprehend ML 功能来检测和编辑客户电子邮件、支持票证、产品评论、社交媒体等中的 PII。 不需要机器学习经验。 例如,您可以分析支持票证和知识文章以检测 PII 实体并在索引文档之前编辑文本。 之后,文档不再包含 PII 实体,用户可以使用数据。 编辑 PII 实体可帮助您保护客户的隐私并遵守当地法律法规。

在这篇文章中,您将了解如何将 Amazon Comprehend 实施到您的流式传输架构中,以使用以下方法近乎实时地编辑 PII 实体 亚马逊 Kinesis 数据流水线AWS Lambda.

这篇文章的重点是编辑使用 Kinesis Data Firehose 提取到流式架构中的选定字段中的数据,您希望在其中创建、存储和维护数据的其他衍生副本以供最终用户或下游应用程序使用。 如果你正在使用 Amazon Kinesis数据流 或在 PII 编辑之外有其他用例,请参阅 使用带有 Amazon Kinesis Data Analytics、Amazon Translate 和 Amazon Comprehend 的 SQL 函数翻译、编辑和分析流数据,我们在这里展示了如何使用 Amazon Kinesis 数据分析工作室 powered by 阿帕奇·齐柏林Apache Flink 以交互方式分析、翻译和编辑流数据中的文本字段。

解决方案概述

下图显示了实时执行流数据的 PII 编辑的示例架构,使用 亚马逊简单存储服务 (Amazon S3), Kinesis Data Firehose 数据转换, 亚马逊领悟AWS Lambda. 此外,我们使用 适用于Python的AWS开发工具包(Boto3) 对于 Lambda 函数。 如图所示,S3 原始存储桶包含未编辑的数据,而 S3 编辑的存储桶包含使用 Amazon Comprehend 后的编辑数据 DetectPiiEntities Lambda 函数中的 API。

涉及费用

除了 Kinesis Data Firehose、Amazon S3 和 Lambda 成本外,此解决方案还将产生来自 Amazon Comprehend 的使用成本。 您支付的金额是包含 PII 和 Lambda 函数处理的字符的记录总数的一个因素。 有关详细信息,请参阅 Amazon Kinesis Data Firehose 定价, 亚马逊全面定价AWS Lambda定价.

例如,假设您有 10,000 条日志记录,并且您要从中编辑 PII 的键值是 500 个字符。 在 10,000 条日志记录中,有 50 条被标识为包含 PII。 费用明细如下:

包含 PII 成本:

  • 每个键值的大小 = 500 个字符(1 个单元 = 100 个字符)
  • 每条记录的单位数(100 个字符)(最少 3 个单位)= 5
  • 总单位数 = 10,000(记录)x 5(每条记录的单位数)x 1(每条记录的 Amazon Comprehend 请求)= 50,000
  • 每单位价格= $ 0.000002
    • 使用 ContainsPiiEntities API 识别具有 PII 的日志记录的总成本 = 0.1 USD [50,000 个单位 x 0.000002 USD] 

编辑 PII 费用:

  • 包含 PII 的总单位数 = 50(记录)x 5(每条记录的单位数)x 1(每条记录的 Amazon Comprehend 请求数)= 250
  • 每单位价格= $ 0.0001
    • 使用 DetectPiiEntities API 识别 PII 位置的总成本 = [单位数量] x [每单位成本] = 250 x 0.0001 美元 = 0.025 美元

识别和编辑的总成本:

  • 总成本:0.1 美元(字段包含 PII 时的验证)+ 0.025 美元(编辑包含 PII 的字段)= 0.125 美元

使用 AWS CloudFormation 部署解决方案

对于这篇文章,我们提供了一个 AWS CloudFormation 流数据编辑 模板,它提供了实现的完整细节,以实现可重复的部署。 部署后,此模板会创建两个 S3 存储桶:一个用于存储从 Amazon Kinesis Data Generator (KDG) 提取的原始样本数据,另一个用于存储编辑数据。 此外,它还会创建一个 Kinesis Data Firehose 传输流,其中包含 DirectPUT 作为输入,以及调用 Amazon Comprehend 的 Lambda 函数 包含Pii实体检测Pii实体 用于识别和编辑 PII 数据的 API。 Lambda 函数依赖于环境变量中的用户输入来确定需要检查哪些键值以查找 PII。

此解决方案中的 Lambda 函数将有效负载大小限制为 100 KB。 如果提供了文本大于 100 KB 的有效负载,则 Lambda 函数将跳过它。

要部署解决方案,请完成以下步骤:

  1. 在美国东部(弗吉尼亚北部)启动 CloudFormation 堆栈 us-east-1:
  2. 输入堆栈名称,并将其他参数保留为默认值
  3. 选择 我承认AWS CloudFormation可能会使用自定义名称创建IAM资源。
  4. 创建堆栈.

手动部署资源

如果您更喜欢手动构建架构而不是使用 AWS CloudFormation,请完成本部分中的步骤。

创建 S3 存储桶

使用以下步骤创建您的 S3 存储桶:

  1. 在Amazon S3控制台上,选择 水桶 在导航窗格中。
  2. 创建存储桶.
  3. 为您的原始数据创建一个存储桶,为您的编辑数据创建一个存储桶。
  4. 记下您刚刚创建的存储桶的名称。

创建 Lambda 函数

要创建和部署 Lambda 函数,请完成以下步骤:

  1. 在Lambda控制台上,选择 创建功能.
  2. 从头开始.
  3. 针对 功能名称,输入 AmazonComprehendPII-Redact.
  4. 针对 运行时,选择 Python的3.9.
  5. 针对 建筑, 选择 x86_64.
  6. 针对 执行角色, 选择 创建具有 Lambda 权限的新角色.
  7. 创建函数后,输入以下代码:
    import json
    import boto3
    import os
    import base64
    import sys
    
    def lambda_handler(event, context):
        
        output = []
        
        for record in event['records']:
            
            # Gathers keys from enviroment variables and makes a list of desired keys to check for PII
            rawkeys = os.environ['keys']
            splitkeys = rawkeys.split(", ")
            print(splitkeys)
            #decode base64
            #Kinesis data is base64 encoded so decode here
            payloadraw=base64.b64decode(record["data"]).decode('utf-8')
            #Loads decoded payload into json
            payloadjsonraw = json.loads(payloadraw)
            
            # Creates Comprehend client
            comprehend_client = boto3.client('comprehend')
            
            
            # This codes handles the logic to check for keys, identify if PII exists, and redact PII if available. 
            for i in payloadjsonraw:
                # checks if the key found in the message matches a redact
                if i in splitkeys:
                    print("Redact key found, checking for PII")
                    payload = str(payloadjsonraw[i])
                    # check if payload size is less than 100KB
                    if sys.getsizeof(payload) < 99999:
                        print('Size is less than 100KB checking if value contains PII')
                        # Runs Comprehend ContainsPiiEntities API call to see if key value contains PII
                        pii_identified = comprehend_client.contains_pii_entities(Text=payload, LanguageCode='en')
                        
                        # If PII is not found, skip over key
                        if (pii_identified['Labels']) == []:
                            print('No PII found')
                        else:
                        # if PII is found, run through redaction logic
                            print('PII found redacting')
                            # Runs Comprehend DetectPiiEntities call to find exact location of PII
                            response = comprehend_client.detect_pii_entities(Text=payload, LanguageCode='en')
                            entities = response['Entities']
                            # creates redacted_payload which will be redacted
                            redacted_payload = payload
                            # runs through a loop that gathers necessary values from Comprehend API response and redacts values
                            for entity in entities:
                                char_offset_begin = entity['BeginOffset']
                                char_offset_end = entity['EndOffset']
                                redacted_payload = redacted_payload[:char_offset_begin] + '*'*(char_offset_end-char_offset_begin) + redacted_payload[char_offset_end:]
                            # replaces original value with redacted value
                            payloadjsonraw[i] = redacted_payload
                            print(str(payloadjsonraw[i]))
                    else:
                        print ('Size is more than 100KB, skipping inspection')
                else:
                    print("Key value not found in redaction list")
            
            redacteddata = json.dumps(payloadjsonraw)
            
            # adds inspected record to record
            output_record = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data' : base64.b64encode(redacteddata.encode('utf-8'))
            }
            output.append(output_record)
            print(output_record)
            
        print('Successfully processed {} records.'.format(len(event['records'])))
        
        return {'records': output}

  8. 部署.
  9. 在导航窗格中,选择 组态。
  10. 导航 环境变量.
  11. 编辑.
  12. 针对 ,输入 keys.
  13. 针对 , 输入要从中编辑 PII 的键值,以逗号和空格分隔。 例如,输入 Tweet1, Tweet2 如果您使用的是本文下一部分中提供的示例测试数据。
  14. 保存.
  15. 导航 一般配置.
  16. 编辑.
  17. 更改的值 超时 到 1 分钟。
  18. 保存.
  19. 导航 权限.
  20. 选择下面的角色名称 执行角色.
    您已重定向到 AWS身份和访问管理 (IAM) 控制台。
  21. 针对 添加权限,选择 附加政策.
  22. 输入 Comprehend 进入搜索栏并选择政策 ComprehendFullAccess.
  23. 附加政策.

创建 Firehose 传输流

要创建 Firehose 交付流,请完成以下步骤:

  1. 在Kinesis Data Firehose控制台上,选择 创建传递流.
  2. 针对 来源, 选择 直接PUT.
  3. 针对 目的地, 选择 Amazon S3.
  4. 针对 传输流名称,输入 ComprehendRealTimeBlog.
  5. 使用 AWS Lambda 转换源记录, 选择 启用.
  6. 针对 AWS Lambda函数,输入您创建的函数的 ARN,或浏览到函数 AmazonComprehendPII-Redact.
  7. 针对 缓冲区大小,将值设置为 1 MB。
  8. 针对 缓冲间隔, 保留为 60 秒。
  9. 目的地设置,选择您为编辑数据创建的 S3 存储桶。
  10. 备份设置,选择您为原始记录创建的 S3 存储桶。
  11. 允许,创建或更新 IAM 角色,或选择具有适当权限的现有角色。
  12. 创建传递流.

使用 Kinesis Data Generator 部署流数据解决方案

您可以使用 Kinesis Data Generator (KDG) 将样本数据提取到 Kinesis Data Firehose 并测试解决方案。 为了简化这个过程,我们提供了一个 Lambda 函数和 CloudFormation 模板来创建一个 亚马逊Cognito 用户并分配适当的权限以使用 KDG。

  1. 点击 Amazon Kinesis 数据生成器页面,选择 使用 CloudFormation 创建 Cognito 用户.您将被重定向到 AWS CloudFormation 控制台以创建您的堆栈。
  2. 为您登录到 KDG 的用户提供用户名和密码。
  3. 将其他设置保留为默认值并创建您的堆栈。
  4. 点击 输出 选项卡,选择 KDG UI 链接。
  5. 请输入登录用户名和密码。

在 Amazon S3 中发送测试记录并验证编辑

要测试解决方案,请完成以下步骤:

  1. 登录到您在上一步中创建的 KDG URL。
  2. 选择部署 AWS CloudFormation 堆栈的区域。
  3. 针对 流/交付流, 选择您创建的交付流(如果您使用模板,它的格式为 accountnumber-awscomprehend-blog).
  4. 将其他设置保留为默认值。
  5. 对于记录模板,您可以创建自己的测试,或使用以下模板。如果您使用下面提供的示例数据进行测试,您应该在 AmazonComprehendPII-Redact Lambda 函数 Tweet1, Tweet2. 如果通过 CloudFormation 部署,请将环境变量更新为 Tweet1, Tweet2 在创建的 Lambda 函数中。 样本测试数据如下:
    {"User":"12345", "Tweet1":" Good morning, everybody. My name is Van Bokhorst Serdar, and today I feel like sharing a whole lot of personal information with you. Let's start with my Email address SerdarvanBokhorst@dayrep.com. My address is 2657 Koontz Lane, Los Angeles, CA. My phone number is 818-828-6231.", "Tweet2": "My Social security number is 548-95-6370. My Bank account number is 940517528812 and routing number 195991012. My credit card number is 5534816011668430, Expiration Date 6/1/2022, my C V V code is 121, and my pin 123456. Well, I think that's it. You know a whole lot about me. And I hope that Amazon comprehend is doing a good job at identifying PII entities so you can redact my personal information away from this streaming record. Let's check"}

  6. 发送数据,并等待几秒钟,以便将记录发送到您的信息流。
  7. 几秒钟后,停止 KDG 生成器并检查您的 S3 存储桶以获取已交付的文件。

以下是原始 S3 存储桶中的原始数据示例:

{"User":"12345", "Tweet1":" Good morning, everybody. My name is Van Bokhorst Serdar, and today I feel like sharing a whole lot of personal information with you. Let's start with my Email address SerdarvanBokhorst@dayrep.com. My address is 2657 Koontz Lane, Los Angeles, CA. My phone number is 818-828-6231.", "Tweet2": "My Social security number is 548-95-6370. My Bank account number is 940517528812 and routing number 195991012. My credit card number is 5534816011668430, Expiration Date 6/1/2022, my C V V code is 121, and my pin 123456. Well, I think that's it. You know a whole lot about me. And I hope that Amazon comprehend is doing a good job at identifying PII entities so you can redact my personal information away from this streaming record. Let's check"}

以下是已编辑 S3 存储桶中已编辑数据的示例:

{"User":"12345", "Tweet1":"Good morning, everybody. My name is *******************, and today I feel like sharing a whole lot of personal information with you. Let's start with my Email address ****************************. My address is ********************************** My phone number is ************.", "Tweet"2: "My Social security number is ***********. My Bank account number is ************ and routing number *********. My credit card number is ****************, Expiration Date ********, my C V V code is ***, and my pin ******. Well, I think that's it. You know a whole lot about me. And I hope that Amazon comprehend is doing a good job at identifying PII entities so you can redact my personal information away from this streaming record. Let's check"}

敏感信息已从编辑消息中删除,让您确信您可以与终端系统共享此数据。

净化

完成对该解决方案的试验后,使用 AWS CloudFormation 控制台清理您的资源,以删除此示例中部署的所有资源。 如果您按照手动步骤操作,则需要手动删除两个存储桶,即 AmazonComprehendPII-Redact 功能, ComprehendRealTimeBlog 流,日志组 ComprehendRealTimeBlog 流,以及创建的任何 IAM 角色。

结论

这篇文章向您展示了如何将 PII 编辑集成到您的近实时流式架构中,并通过在飞行中执行编辑来减少数据处理时间。 在这种情况下,您将编辑后的数据提供给最终用户,并且数据湖管理员会保护原始存储桶以供以后使用。 您还可以使用 Amazon Comprehend 构建额外的处理来识别语气或情绪,识别数据中的实体,并对每条消息进行分类。

作为本文的一部分,我们为每项服务提供了单独的步骤,还包括一个 CloudFormation 模板,允许您在您的帐户中配置所需的资源。 此模板应仅用于概念证明或测试场景。 请参阅开发人员指南了解 亚马逊领悟, LAMBDAKinesis 数据流水线 任何服务限制。

要开始使用 PII 识别和编辑,请参阅 个人身份信息(PII). 通过本文中的示例架构,您可以使用 Kinesis Data Firehose 数据转换将任何 Amazon Comprehend API 与近乎实时的数据集成。 要详细了解您可以使用 Kinesis Data Firehose 使用近实时数据构建什么,请参阅 Amazon Kinesis Data Firehose 开发人员指南. 此解决方案在提供 Amazon Comprehend 和 Kinesis Data Firehose 的所有 AWS 区域均可用。


关于作者

乔莫罗蒂 是 Amazon Web Services (AWS) 的解决方案架构师,帮助美国中西部的企业客户。 他担任过广泛的技术职务,并喜欢向客户展示可能的艺术。 在空闲时间,他喜欢与家人共度美好时光,探索新地方并过度分析球队的表现

斯里哈什·阿达里 是 Amazon Web Services (AWS) 的高级解决方案架构师,他帮助客户从业务成果逆向工作,以在 AWS 上开发创新解决方案。 多年来,他帮助多个客户进行跨行业垂直的数据平台转型。 他的核心专业领域包括技术战略、数据分析和数据科学。 在业余时间,他喜欢打网球、疯狂地看电视节目和弹奏 Tabla。

现货图片

最新情报

现货图片

在线答疑

你好呀! 我怎么帮你?