Logo Zephyrnet

Đơn giản hóa đường ống ETL và ML của bạn bằng tính năng Amazon Athena UNLOAD

Ngày:

Many organizations prefer SQL for data preparation because they already have developers for extract, transform, and load (ETL) jobs and analysts preparing data for machine learning (ML) who understand and write SQL queries. amazon Athena là một dịch vụ truy vấn tương tác giúp dễ dàng phân tích dữ liệu trong Dịch vụ lưu trữ đơn giản của Amazon (Amazon S3) sử dụng SQL tiêu chuẩn. Athena không có máy chủ, vì vậy không có cơ sở hạ tầng để quản lý và bạn chỉ trả tiền cho các truy vấn mà bạn chạy.

By default, Athena automatically writes SELECT query results in CSV format to Amazon S3. However, you might often have to write SELECT query results in non-CSV files such as JSON, Parquet, and ORC for various use cases. In this post, we walk you through the BỎ TẢI statement in Athena and how it helps you implement several use cases, along with code snippets that you can use.

Athena UNLOAD overview

CSV is the only output format used by the Athena SELECT query, but you can use UNLOAD to write the output of a SELECT query to the formats and compression that UNLOAD supports. When you use UNLOAD in a SELECT query statement, it writes the results into Amazon S3 in specified data formats of Apache Parquet, ORC, Apache Avro, TEXTFILE, and JSON.

Although you can use the CTAS statement to output data in formats other than CSV, those statements also require the creation of a table in Athena. The UNLOAD statement is useful when you want to output the results of a SELECT query in a non-CSV format but don’t require the associated table. For example, a downstream application might require the results of a SELECT query to be in JSON format, and Parquet or ORC might provide a performance advantage over CSV if you intend to use the results of the SELECT query for additional analysis.

In this post, we walk you through the following use cases for the UNLOAD feature:

  • Compress Athena query results to reduce storage costs and speed up performance for downstream consumers
  • Store query results in JSON file format for specific downstream consumers
  • Feed downstream Amazon SageMaker ML models that require files as input
  • Simplify ETL pipelines with Chức năng bước AWS without creating a table

Use case 1: Compress Athena query results

When you’re using Athena to process and create large volumes of data, storage costs can increase significantly if you don’t compress the data. Furthermore, uncompressed formats like CSV and JSON require you to store and transfer a large number of files across the network, which can increase IOPS and network costs. To reduce costs and improve downstream big data processing application performance such as Spark applications, a best practice is to store Athena output into compressed columnar compressed file formats such as ORC and Parquet.

You can use the UNLOAD statement in your Athena SQL statement to create compressed ORC and Parquet file formats. In this example, we use a 3 TB TPC-DS dataset to find all items returned between a store and a website. The following query joins the four tables: item, store_returns, web_returnscustomer_address:

UNLOAD (
		select *
		from store_returns, item, web_returns, customer_address
		where sr_item_sk = i_item_sk and
		wr_item_sk = i_item_sk and
		wr_refunded_addr_sk = ca_address_sk
	) to 's3://your-bucket/temp/athenaunload/usecase1/' with (
		format = 'PARQUET',
		compression = 'SNAPPY',
		partitioned_by = ARRAY['ca_location_type']

	)

The resulting query output when Snappy compressed and stored in Parquet format resulted in a 62 GB dataset. The same output in a non-compressed CSV format resulted in a 248 GB dataset. The Snappy compressed Parquet format yielded a 75% smaller storage size, thereby saving storage costs and resulting in faster performance.

Use case 2: Store query results in JSON file format

Some downstream systems to Athena such as web applications or third-party systems require the data formats to be in JSON format. The JSON file format is a text-based, self-describing representation of structured data that is based on key-value pairs. It’s lightweight, and is widely used as a data transfer mechanism by different services, tools, and technologies. In these use cases, the UNLOAD statement with the parameter format value of JSON can unload files in JSON file format to Amazon S3.

The following SQL extracts the returns data for a specific customer within a specific data range against the 3 TB catalog_returns table and stores it to Amazon S3 in JSON format:

UNLOAD (
		select cr_returned_date_sk, cr_returning_customer_sk, cr_catalog_page_sk, cr_net_loss
		from catalog_returns
		where cr_returned_date_sk = 2450821 and cr_returning_customer_sk = 11026691
	) to 's3://your-bucket/temp/athenaunload/usecase2/' with (
		format = 'JSON', compression = 'NONE'
	)

Theo mặc định, Athena uses Gzip for JSON and TEXTFILE formats. You can set the compression to NONE to store the UNLOAD result without any compression. The query result is stored as the following JSON file:

{"cr_returned_date_sk":2450821,"cr_returning_customer_sk":11026691,"cr_catalog_page_sk":20.8,"cr_net_loss":53.31}

The query result can now be consumed by a downstream web application.

Use case 3: Feed downstream ML models

Analysts and data scientists rely on Athena for ad hoc SQL queries, data discovery, and analysis. They often like to quickly create derived columns such as aggregates or other features. These need to be written as files in Amazon S3 so a downstream ML model can directly read the files without having to rely on a table.

You can also parametrize queries using Athena prepared statements that are repetitive. Using the UNLOAD statement in a prepared statement provides the self-service capability to less technical users or analysts and data scientists to export files needed for their downstream analysis without having to write queries.

In the following example, we create derived columns and feature engineer for a downstream SageMaker ML model that predicts the best discount for catalog items in future promotions. We derive averages for quantity, list price, discount, and sales price for promotional items sold in stores where the promotion is not offered by mail or a special event. Then we restrict the results to a specific gender, marital, and educational status. We use the following query:

UNLOAD(
		Select i_item_id,
	        avg(ss_quantity) avg_sales,
	        avg(ss_list_price) avg_list_price,
	        avg(ss_coupon_amt) avg_coupon_amt,
	        avg(ss_sales_price) avg_sales_price
	 from store_sales, customer_demographics, date_dim, item, promotion
	 where cast(ss_sold_date_sk AS int) = d_date_sk and
	       ss_item_sk = i_item_sk and
	       ss_cdemo_sk = cd_demo_sk and
	       ss_promo_sk = p_promo_sk and
	       cd_gender = 'M' and
	       cd_marital_status = 'M' and
	       cd_education_status = '4 yr Degree' and
	       (p_channel_email = 'N' or p_channel_event = 'N') and
	       d_year = 2001
	 group by i_item_id
	 order by i_item_id
	) to 's3://your-bucket/temp/athenaunload/usecase3/' with (
		format = 'PARQUET',compression = 'SNAPPY'
	)

The output is written as Parquet files in Amazon S3 for a downstream SageMaker model training job để tiêu dùng.

Use case 4: Simplify ETL pipelines with Step Functions

Step Functions is integrated with the Athena console to facilitate building workflows that include Athena queries and data processing operations. This helps you create repeatable and scalable data processing pipelines as part of a larger business application and visualize the workflows on the Athena console.

In this use case, we provide an example query result in Parquet format for downstream consumption. In this example, the raw data is in TSV format and gets ingested on a daily basis. We use the Athena UNLOAD statement to convert the data into Parquet format. After that, we send the location of the Parquet file as an Dịch vụ thông báo đơn giản của Amazon (Amazon SNS) notification. Downstream applications can be notified via SNS to take further actions. One common example is to initiate a Lambda function that uploads the Athena transformation result into Amazon RedShift.

The following diagram illustrates the ETL workflow.

Quy trình làm việc bao gồm các bước sau:

  1. Bắt đầu một Keo AWS crawler pointing to the raw S3 bucket. The crawler updates the metadata of the raw table with new files and partitions.
  2. Invoke a Lambda function to clean up the previous UNLOAD result. This step is required because UNLOAD doesn’t write data to the specified location if the location already has data in it (UNLOAD doesn’t overwrite existing data). To reuse a bucket location as a destination for UNLOAD, delete the data in the bucket location, and then run the query again. Another common pattern is to UNLOAD data to a new partition with incremental data processing.
  3. Start an Athena UNLOAD query to convert the raw data into Parquet.
  4. Send a notification to downstream data consumers when the file is updated.

Thiết lập tài nguyên với AWS CloudFormation

To prepare for querying both data sources, launch the provided Hình thành đám mây AWS bản mẫu:

Keep all the provided parameters and choose Tạo ngăn xếp.

Mẫu CloudFormation tạo các tài nguyên sau:

  • Một nhóm làm việc Athena etl-workgroup, which holds the Athena UNLOAD queries.
  • A data lake S3 bucket that holds the raw table. We use the Tập dữ liệu đánh giá của khách hàng Amazon trong bài đăng này.
  • An Athena output S3 bucket that holds the UNLOAD result and query metadata.
  • An AWS Glue database.
  • An AWS Glue crawler pointing to the data lake S3 bucket.
  • A LoadDataBucket Lambda function to load the Amazon Customer Reviews raw data into the S3 bucket.
  • A CleanUpS3Folder Lambda function to clean up previous Athena UNLOAD result.
  • An SNS topic to notify downstream systems when the UNLOAD is complete.

When the stack is fully deployed, navigate to the Kết quả đầu ra tab of the stack on the AWS CloudFormation console and note the value of the following resources:

  • AthenaWorkgroup
  • AthenaOutputBucket
  • CleanUpS3FolderLambda
  • GlueCrawler
  • SNSTopic

Build a Step Functions workflow

We use the Athena Workflows feature to build the ETL pipeline.

  1. Trên bảng điều khiển Athena, bên dưới việc làm trong ngăn điều hướng, chọn Quy trình công việc.
  2. Theo Create Athena jobs with Step Functions workflows, Cho Query large datasets, chọn Bắt đầu.
  3. Chọn Create your own workflow.
  4. Chọn Tiếp tục.

The following is a screenshot of the default workflow. Compare the default workflow against the earlier ETL workflow we described. The default workflow doesn’t contain a Lambda function invocation and has an additional GetQueryResult bậc thang.

Next, we add a Lambda Invoke step.

  1. Tìm kiếm Lambda Invoke trong thanh tìm kiếm.
  2. Chọn Lambda:Invoke step and drag it to above the Athena: StartQueryExecution bậc thang.
  3. Chọn Athena:GetQueryResults step (right-click) and choose Delete state.

  4. Now the workflow aligns with the earlier design.
  5. Choose the step Glue: StartCrawler.
  6. Trong tạp chí Cấu hình phần, dưới Tham số API, enter the following JSON (provide the AWS Glue crawler name from the CloudFormation stack output):
    {
      "Name": "GlueCrawler"
    }

  7. Choose the step Glue: GetCrawler.
  8. Trong tạp chí Cấu hình phần, dưới Tham số API, enter the following JSON:
    {
      "Name": "GlueCrawler"
    }

  9. Choose the step Lambda: Invoke.
  10. Trong tạp chí Cấu hình phần, dưới Tham số API, Cho Tên chức năng, choose the function -CleanUpS3FolderLambda-.
  11. Trong tạp chí Khối hàng section, enter the following JSON (include the Athena output bucket from the stack output):
    {
      "bucket_name": “AthenaOutputBucket”,
      "prefix": "parquet/"
    }

  12. Choose the step Athena: StartQueryExecution.
  13. Ở bên phải Cấu hình phần, dưới Tham số API, enter the following JSON (provide the Athena output bucket and workgroup name):
    {
      "QueryString": "UNLOAD (SELECT * FROM "athena_unload_blog"."reviews" )  TO 's3://AthenaOutputBucket/parquet' WITH (format = 'PARQUET',compression = 'SNAPPY')",
      "WorkGroup": “AthenaWorkgroup”
    }

Chú ý Đợi nhiệm vụ hoàn thành check box is selected. This pauses the workflow while the Athena query is running.

  1. Choose the step SNS: Publish.
  2. Trong tạp chí Cấu hình phần, dưới Tham số API, Cho Topic, chọn SNSTopic created by the CloudFormation template.
  3. Trong tạp chí Tin nhắn section, enter the following JSON to pass the data manifest file location to the downstream consumer:
    {
      "Input.$": "$.QueryExecution.Statistics.DataManifestLocation"
    }

Để biết thêm thông tin, hãy tham khảo GetQueryExecution response syntax.

  1. Chọn Sau.
  2. Review the generated code and choose Sau.
  3. Trong tạp chí Quyền phần, chọn Tạo vai trò mới.
  4. Review the auto-generated permissions and choose Create state machine.
  5. Trong tạp chí Add additional permissions to your new execution role phần, chọn Edit role in IAM.
  6. Add permissions and choose Đính kèm các chính sách.
  7. Tìm kiếm AWSGlueConsoleFullAccess managed policy and attach it.

This policy grants full access to AWS Glue resources when using the AWS Management console. Generate a policy based on access activity in production following the nguyên tắc đặc quyền ít nhất.

Kiểm tra quy trình làm việc

Next, we test out the Step Functions workflow.

  1. Trên bảng điều khiển Athena, bên dưới việc làm trong ngăn điều hướng, chọn Quy trình công việc.
  2. Theo Máy trạng thái, choose the workflow we just created.
  3. Chọn Thực hiện, sau đó chọn Bắt đầu thực hiện to start the workflow.
  4. Wait until the workflow completes, then verify there are UNLOAD Parquet files in the bucket AthenaOutputBucket.

Làm sạch

To help prevent unwanted charges to your AWS account, delete the AWS resources that you used in this post.

  1. On the Amazon S3 console, choose the -athena-unload-data-lake Gầu múc.
  2. Chọn tất cả các tệp và thư mục và chọn Xóa bỏ.
  3. đăng ký hạng mục thi permanently delete as directed and choose Xóa các đối tượng.
  4. Repeat these steps to remove all files and folders in the -athena-unload-output Gầu múc.
  5. Trên bảng điều khiển AWS CloudFormation, hãy xóa ngăn xếp mà bạn đã tạo.
  6. Đợi trạng thái ngăn xếp thay đổi thành DELETE_COMPLETE.

Kết luận

In this post, we introduced the UNLOAD statement in Athena with some common use cases. We demonstrated how to compress Athena query results to reduce storage costs and improve performance, store query results in JSON file format, feed downstream ML models, and create and visualize ETL pipelines with Step Functions without creating a table.

Để tìm hiểu thêm, hãy tham khảo Athena UNLOAD documentationVisualizing AWS Step Functions workflows from the Amazon Athena console.


Về các tác giả

dylan qu là Kiến trúc sư giải pháp chuyên gia tập trung vào Dữ liệu lớn & Phân tích với Amazon Web Services. Anh ấy giúp khách hàng thiết kế kiến ​​trúc và xây dựng các giải pháp dựa trên đám mây có khả năng mở rộng, hiệu quả và bảo mật cao trên AWS.

Harsha Tadiparthi is a Principal Solutions Architect focused on providing analytics and AI/ML strategies and solution designs to customers.

tại chỗ_img

Tin tức mới nhất

tại chỗ_img