实时更新动态数据 实验概述和目标 从物联网(IoT)设备流式传输数据的一个挑战是架构经常发生变化。例如,传感器可能首先发送四个值,然后是两个值,然后是十个值。对于许多传感器来说,这种情况可能会增加很多。适应不断变化的架构并更新数据湖中的记录可能是一个挑战,尤其是对于基于对象存储(如Amazon Simple Storage Service(Amazon S3))的数据湖来说。Apache Hudi Connector是AWS Marketplace上的一个开源工具,可以帮助您应对这个挑战。 在本实验中,您将使用Amazon S3、Amazon Athena、AWS Glue和Apache Hudi来解决动态更改架构的挑战。您将使用这些服务来实现高效的原地数据更新,并运行查询以获得接近实时的数据。 完成本实验后,您应该能够执行以下操作: 创建一个AWS Glue作业来运行自定义的抽取、转换和加载(ETL)脚本。 使用Athena运行查询。 使用Apache Hudi Connector执行原地更新。 持续时间 完成本实验需要大约90分钟。 AWS服务限制 在本实验环境中,对AWS服务和服务操作的访问可能受限于完成实验指令所需的服务。如果您尝试访问其他服务或执行超出本实验描述的操作,则可能会遇到错误。 情景 Mary是数据科学团队的成员,她处理大量来自物联网设备的流式数据。每次设备重置时,数据的大小和结构都会发生变化。通常只发送几个字段的设备偶尔可能发送多个字段。由于标准工具要求数据遵循特定的结构,因此处理这种情况非常复杂。此外,更改的数据应该只影响必需的行,而不是整个数据集。 您的挑战是开发一个概念验证(POC)来适应不断变化的架构,并只更新受影响的记录。 您决定使用带有自定义脚本的AWS Glue作业来处理动态架构,使用Apache Hudi Connector进行流式数据的原地更新。您将使用Athena在动态数据上运行类似SQL的查询,并使用Amazon S3作为数据湖。最后,您将使用Amazon Kinesis Data Streams来接收来自Amazon Kinesis Data Generator(KDG)生成的随机数据,该工具模拟物联网设备。 当您启动实验时,环境中将包含以下图示中显示的资源。 在实验结束时,您将创建如下图所示的架构。图后的表格提供了对架构的详细说明。 访问AWS管理控制台 在这些说明的顶部,选择“开始实验”。 实验会话开始。 页面顶部显示一个计时器,显示会话的剩余时间。 提示:要在任何时候刷新会话长度,请在计时器达到0:00之前再次选择“开始实验”。 在继续之前,请等待位于左上角的AWS链接右侧的圆形图标变为绿色。 要连接到AWS管理控制台,请选择左上角的AWS链接。 一个新的浏览器选项卡将打开,并将您连接到控制台。 提示:如果没有打开新的浏览器选项卡,则通常会在浏览器顶部显示一个横幅或图标,其中显示浏览器正在阻止该网站打开弹出窗口。选择横幅或图标,然后选择“允许弹出窗口”。 任务1:分析实验环境 在这个任务中,您将检查实验环境,并注意初始配置的详细信息。 检索在实验环境中创建的资源的值。 在“服务”右侧的搜索框中,搜索并选择“CloudFormation”以打开AWS CloudFormation控制台。 在堆栈列表中,选择描述中包含ADE的堆栈名称的链接。 选择“输出”选项卡。 输出列表显示了堆栈中一些资源的值,如下图所示。 将这些值复制到文本编辑器中,以备在实验中稍后使用。 下表简要描述了CloudFormation为此实验创建的一些资源。 任务2:订阅和激活Hudi连接器 在此任务中,您将配置Hudi连接器,AWS Glue作业将使用它与Amazon S3中的数据进行交互。通过此连接,您可以进行动态的原地数据更新。您将从AWS Marketplace配置此工具。 创建AWS Glue的Hudi连接器。 在“服务”右侧的搜索框中,搜索并选择“AWS Glue Studio”。 打开导航窗格(选择菜单图标),然后选择“Marketplace”。 在“搜索AWS Glue Studio产品”部分,搜索hudi。 选择“Apache Hudi Connector for AWS Glue”。 选择“继续订阅”。 选择“接受条款”。 等待“继续配置”按钮可用。 选择“继续配置”。 对于“履行选项”,选择“Glue 3.0”。 对于“软件版本”,选择“0.9.0(2022年2月17日)”。 选择“继续启动”。 选择“使用说明”。 在对话框顶部附近,选择“从AWS Glue Studio激活Glue连接器”。 AWS Glue Studio控制台会在新的浏览器选项卡或窗口中打开。您将在这里配置和激活连接器。 输入名称为“hudi-connection”。 选择“创建连接并激活连接器”。 要查看详细信息,请在“连接”部分选择“hudi-connection”链接。 在此任务中,您为AWS Glue准备了一个Hudi连接器,用于与S3存储桶进行交互。 任务3:配置AWS Glue的作业脚本 在此任务中,您将获取以下文件,这些文件用于配置和运行AWS Glue作业。 glue_job_script.py:这是AWS Glue作业将运行的Python脚本,用于执行原地数据更新。 glue_job.template:这是将用于创建AWS Glue作业的CloudFormation模板。 注意:选择右键保存/打开文件,并浏览内容。 打开AWS Cloud9终端并下载两个文件。 从您记录了CloudFormation堆栈输出的文本文件中,找到Cloud9URL值。将该URL粘贴到新的浏览器选项卡或窗口中,以打开AWS Cloud9终端。 等待终端提示显示voclabs:~/environment $。这可能需要几分钟的时间。 提示:为了方便起见,您可能想要关闭“Welcome”选项卡,并将终端窗口拖到页面的顶部。 要下载用于配置和运行AWS Glue作业的文件,请运行以下命令: ``` wget https://aws-tc-largeobjects.s3.us-west-2.amazonaws.com/CUR-TF-200-ACDSCI-1-66873/lab-06-hudi/s3/glue_job_script.py wget https://aws-tc-largeobjects.s3.us-west-2.amazonaws.com/CUR-TF-200-ACDSCI-1-66873/lab-06-hudi/s3/glue_job.template ``` 提示:要确认两个文件是否成功下载,可以运行ls命令来列出它们。 将文件复制到S3存储桶。 运行以下命令。在两个命令中,用CloudFormation输出中记录的存储桶名称替换<HUDIBucketName>。 ``` aws s3 cp glue_job_script.py s3://<HUDIBucketName>/artifacts/ aws s3 cp glue_job.template s3://<HUDIBucketName>/templates/ ``` 这些命令的输出类似于以下图像。 获取用于AWS Glue作业上传的CloudFormation模板的URL。 在“服务”右侧的搜索框中,搜索并选择“S3”以打开Amazon S3控制台。 选择包含ade-hudi-bucket的存储桶名称的链接。 选择“templates”链接。 选择“glue_job.template”,然后选择“复制URL”以复制模板的URL。将URL保存到您的文本编辑器中。 在此任务中,您配置了创建和运行AWS Glue作业所需的脚本。 任务4:配置和运行AWS Glue作业 AWS Glue提供了运行ETL作业以在各种数据源之间复制数据的功能。该服务可以运行自定义和标准的ETL脚本。在此任务中,您将使用CloudFormation模板配置AWS Glue作业。然后,您将运行AWS Glue作业,该作业将运行一个自定义的Python脚本。该脚本将从KDG工具消费数据,并与S3存储桶进行交互,根据需要插入或更新数据。 使用CloudFormation创建AWS Glue作业的堆栈。 导航到CloudFormation控制台。 选择“创建堆栈”>“使用新资源(标准)”。 对于“模板来源”,选择“Amazon S3 URL”。 对于“Amazon S3 URL”,粘贴先前从Amazon S3检索到的CloudFormation模板的URL。 选择“下一步”。 对于“堆栈名称”,输入“create-glue-job”。 对于“HudiARN”,粘贴先前从CloudFormation输出中记录的HUDIIamRoleARN值。 对于“LocalS3Bucket”,粘贴先前从CloudFormation输出中记录的HUDIBucketName值。 选择“下一步”。 再次选择“下一步”。 选择“创建堆栈”。 等待堆栈创建完成。这可能需要几分钟的时间。您可能需要刷新页面。 运行AWS Glue作业。 在“服务”右侧的搜索框中,搜索并选择“AWS Glue”以打开AWS Glue控制台。 在导航窗格中,选择ETL,然后选择“作业”。 在“您的作业”部分,选择包含Hudi_Streaming_Job的作业名称链接。 注意:您在上一步中使用的CloudFormation模板创建了此作业。您可以在“脚本”选项卡上查看Python代码。 要启动作业,请在右上角选择“运行”。 要查看作业的状态,请选择“运行”选项卡。 在继续之前,请确保运行状态为“运行中”。您可能需要刷新页面。 在此任务中,您配置并启动了AWS Glue作业。 任务5:使用KDG向Kinesis发送数据 在此任务中,您将使用Kinesis Data Generator(KDG)工具生成并发送随机数据到Kinesis。该工具将模拟从传感器发送数据的物联网设备。 访问KDG并开始发送数据。 从您从CloudFormation中记录的输出中找到KinesisDataGeneratorUrl值。将该URL粘贴到新的浏览器选项卡或窗口中,以打开KDG工具。 URL类似于https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html?upid=us-east-1_xrN3iZNu2&ipid=us-east-1:dad05a25-1c1a-4efd-b603-4b9e63912446&cid=3090bsfuesh8ui6qdjonu201n6&r=us-east-1。 使用以下凭据登录到KDG: 用户名:Mary 密码:Welcome1234 在登录后显示的页面中,配置以下内容: 区域:选择us-east-1。 流/传送流:选择hudi_demo_stream。 每秒记录数:选择Constant,并输入1。 对于记录模板,请选择Template 1并将其重命名为Schema 1。 将以下内容复制并粘贴到Schema 1代码块中。 ``` { "name" : "{{random.arrayElement(["Sensor1","Sensor2","Sensor3", "Sensor4"])}}", "date": "{{date.utc(YYYY-MM-DD)}}", "year": "{{date.utc(YYYY)}}", "month": "{{date.utc(MM)}}", "day": "{{date.utc(DD)}}", "column_to_update_integer": {{random.number(1000000000)}}, "column_to_update_string":"{{random.arrayElement(["45f","47f","44f", "48f"])}}" } ``` 选择“发送数据”。 一个窗口打开,显示数据正在发送到Kinesis。保持此浏览器选项卡或窗口打开,以便在实验进行过程中继续向Kinesis发送数据。 在此任务中,您配置了KDG并开始生成供Kinesis消费的数据。 任务6:使用Athena检查模式和查询数据 在此任务中,您将检查存储有来自KDG的数据的表的模式。您还将使用Athena对表运行查询。 检查表的模式。 导航到AWS Glue控制台。 在导航窗格中,选择“表”。 列出了两个表: hudi_demo_table:该表存储来自KDG的数据。 hudi_demo_kinesis_stream_table:AWS Glue作业创建了此表以适应NULL值。 注意:表需要几分钟的时间才会显示。 选择hudi_demo_table的链接。 表的模式显示,并且类似于以下图像。 分析:请注意模式中的四个分区列。这四列用于对存储此表数据的S3存储桶中的数据进行分区。如果您愿意,您可以检查S3存储桶中的这些分区。 配置Athena。 在“服务”右侧的搜索框中,搜索并选择“Athena”以打开Athena控制台。 在导航窗格中,选择“查询编辑器”。 注意:如果导航窗格已折叠,请选择菜单图标以展开它。 在“数据”面板中,对于数据库,请选择“hudi_demo_db”。 在“表和视图”部分,展开“hudi_demo_table”以显示模式。 注意:模式看起来与您在AWS Glue控制台中查看的时候相同。 选择页面顶部的“设置”选项卡。 选择“管理”。 在“查询结果位置”字段的右侧,选择“浏览S3”。 选择“ade-dsc-bucket-xxxx”。 注意:Athena查询的结果将存储在该存储桶中。 选择“选择”,然后选择“保存”。 返回到“编辑器”选项卡。 在“数据”面板中,对于“hudi_demo_table”的右侧,选择三个点图标,然后选择“预览表”。 注意右侧的查询选项卡中出现了以下查询:“SELECT * FROM“hudi_demo_db”。“hudi_demo_table”limit 10;”。 查询结果显示,并且类似于以下图像。 注意:KDG正在模拟物联网设备。在本例中,该工具正在模拟温度传感器。 运行Athena查询。 为了显示您感兴趣的属性,复制并粘贴以下查询到一个查询选项卡中,然后选择“运行”。 ``` SELECT _hoodie_commit_seqno, _hoodie_record_key, column_to_update_string FROM "hudi_demo_table" ``` 结果类似于以下图像。 多次运行查询,可以看到值的变化,如下图所示。 注意:观察传感器3发送的数据的差异。 分析:KDG每秒发送新数据。每次运行查询时,您都会获得一组新的随机记录。AWS Glue作业捕捉到数据的变化,并使用Hudi连接器将数据原地插入或更新到S3数据湖中。 在此任务中,您使用Athena查询了数据,并观察了数据的变化。 任务7:动态更改架构 在此任务中,您将更改KDG中数据的结构。然后,您将在Athena中运行查询并分析结果,而无需对AWS Glue作业或表结构进行任何更改。 更改架构并运行Athena查询。 返回到运行KDG工具的选项卡或窗口。 选择“停止向Kinesis发送数据”。 对于记录模板,请选择Template 2并将其重命名为Schema 2。 将以下内容复制并粘贴到Schema 2代码块中。 ``` { "name" : "{{random.arrayElement(["Sensor1","Sensor2","Sensor3", "Sensor4"])}}", "date": "{{date.utc(YYYY-MM-DD)}}", "year": "{{date.utc(YYYY)}}", "month": "{{date.utc(MM)}}", "day": "{{date.utc(DD)}}", "column_to_update_integer": {{random.number(1000000000)}}, "column_to_update_string": "{{random.arrayElement(["45f","47f","44f","48f"])}}", "new_column": "{{random.number(1000000000)}}" } ``` 注意:架构中包含了一个额外的列new_column。 选择“发送数据”,并保持KDG工具运行。 返回到Athena控制台,并刷新页面。 在“数据”面板中,在“表和视图”部分,展开“hudi_demo_table”以显示模式。 注意表中包含了额外的列new_column。 多次运行以下查询,并观察new_column值的变化。 ``` SELECT _hoodie_commit_seqno, _hoodie_record_key, column_to_update_string, new_column FROM "hudi_demo_table" ``` 结果类似于以下内容。 现在,再次运行查询并观察传感器3值的变化。 结果类似于以下内容。 分析:当将架构更改为添加新列时,AWS Glue作业依赖于Hudi内置的架构演进功能。这些功能使得可以更新AWS Glue数据目录以添加新列。Hudi还在输出文件中(写入Amazon S3的Parquet文件)添加了额外的列。这使得查询引擎(Athena)可以查询具有额外列的Hudi数据集,而不会出现任何问题。有关更多信息,请参阅Apache Hudi网站上的架构演化。 在这个任务中,您修改了架构,并观察了AWS Glue作业如何处理这些变化。在修改架构后,您可以运行Athena查询并进行数据分析,没有任何问题。 任务8:恢复架构更改 在这个最后的任务中,您将恢复架构,并验证您可以继续进行数据分析而没有任何问题。 更改架构并运行Athena查询。 返回到运行KDG工具的选项卡或窗口。 选择“停止向Kinesis发送数据”。 对于记录模板,请选择Schema 1。 选择“发送数据”。 选择“发送数据”,并保持KDG工具运行。 返回到Athena控制台,并刷新页面。 在“数据”面板中,在“表和视图”部分,展开“hudi_demo_table”以显示模式。 注意到额外的列new_column仍然包含在表中。 多次运行以下查询。 ``` SELECT _hoodie_commit_seqno, _hoodie_record_key, column_to_update_string, new_column FROM "hudi_demo_table" ``` 注意到在查询结果中仍然包含new_column;然而,该列不包含任何值。 分析:在您再次更改架构并从数据中删除new_column后,AWS Glue作业中的Python脚本处理了记录布局不匹配的情况。 该方法针对每个待摄取的记录查询AWS Glue数据目录,并获取当前Hudi表的架构。然后,它将Hudi表的架构与待摄取记录的架构合并,并在该架构中用null值进行了增强以适应new_column。这使得Athena可以无问题地查询Hudi数据集。 在此任务中,您恢复了架构,并观察到记录被原地更新。 团队更新 恭喜!在此实验中,您创建了Hudi连接,并使用自定义的Python脚本从Kinesis Data Streams中读取数据。您使用Athena对数据运行了查询,并实时查看了变化。您还更改了架构,并观察到Athena查询无缝运行,并返回了预期的数据。 您的POC成功地演示了如何处理动态数据变化并适应数据结构的变化。