제퍼넷 로고

Amazon MSK를 통한 Amazon Redshift 스트리밍 수집을 사용하여 거의 실시간 분석을 구현하는 모범 사례 | 아마존 웹 서비스

시간

아마존 레드 시프트 빠르고 간단하며 안전한 대규모 분석을 통해 통찰력을 얻는 시간을 가속화하는 확장 가능한 완전 관리형 클라우드 데이터 웨어하우스입니다. 수만 명의 고객이 Amazon Redshift를 사용하여 엑사바이트 규모의 데이터를 분석하고 복잡한 분석 쿼리를 실행함으로써 가장 널리 사용되는 클라우드 데이터 웨어하우스가 되었습니다. 데이터 웨어하우스 인프라를 관리할 필요 없이 모든 데이터에 대해 몇 초 만에 분석을 실행하고 확장할 수 있습니다.

당신은을 사용할 수 있습니다 Amazon Redshift 스트리밍 수집 거의 실시간으로 분석 데이터베이스를 업데이트하는 기능입니다. Amazon Redshift 스트리밍 수집을 사용하면 데이터 스트림 위에 직접 구체화된 뷰를 생성할 수 있어 데이터 파이프라인이 단순화됩니다. Amazon Redshift의 이 기능을 사용하면 SQL(구조적 쿼리 언어)을 사용하여 다음과 같은 데이터 스트림에 연결하고 데이터를 직접 수집할 수 있습니다. Amazon Kinesis 데이터 스트림 or Apache Kafka 용 Amazon Managed Streaming (Amazon MSK) 데이터를 스트리밍하고 Amazon Redshift로 직접 데이터를 가져옵니다.

이 게시물에서는 Amazon MSK와 함께 Amazon Redshift 스트리밍 수집을 사용하여 거의 실시간 분석을 구현하는 모범 사례에 대해 논의합니다.

솔루션 개요

Amazon Redshift 스트리밍 수집을 사용하여 MSK 주제의 데이터를 Amazon Redshift로 수집하는 예제 파이프라인을 살펴보겠습니다. 또한 Amazon Redshift에서 점 표기법을 사용하여 JSON 데이터 중첩을 해제하는 방법도 보여줍니다. 다음 다이어그램은 솔루션 아키텍처를 보여줍니다.

프로세스 흐름은 다음 단계로 구성됩니다.

  1. Redshift 클러스터에서 스트리밍 구체화된 뷰를 생성하여 MSK 주제의 라이브 스트리밍 데이터를 사용합니다.
  2. 수집된 MSK 주제에 대한 레코드 수준에서 Kafka 파티션과 Kafka 오프셋의 고유한 조합을 사용하여 저장 프로시저를 사용하여 변경 데이터 캡처(CDC)를 구현합니다.
  3. Redshift 클러스터에서 사용자 대상 테이블을 생성하고 점 표기법을 사용하여 JSON 문서를 스트리밍 구체화된 뷰에서 테이블의 데이터 열로 중첩 해제합니다. 정기적으로 저장 프로시저를 호출하여 새로운 데이터를 지속적으로 로드할 수 있습니다.
  4. 연결 설정 사이 아마존 퀵 사이트 대시보드와 Amazon Redshift를 통해 시각화 및 통찰력을 제공합니다.

이 게시물의 일부로 다음 주제에 대해서도 논의합니다.

  • Amazon MSK에서 Amazon Redshift로의 교차 계정 스트리밍 수집을 구성하는 단계
  • 스트리밍 구체화된 뷰에서 최적화된 성능을 달성하기 위한 모범 사례
  • Amazon Redshift 스트리밍 수집 오류를 추적하는 모니터링 기술

사전 조건

다음이 있어야합니다.

  • AWS 계정.
  • 사용 사례에 따라 다음 리소스 중 하나:
  • MSK 클러스터. 지침은 다음을 참조하세요. Amazon MSK 클러스터 생성.
  • 주제 데이터 생산자가 데이터를 게시할 수 있는 MSK 클러스터에서.
  • MSK 클러스터의 주제에 데이터를 쓰는 데이터 생산자입니다.

MSK 주제 설정 시 고려 사항

MSK 주제를 구성할 때 다음 고려 사항에 유의하세요.

  • MSK 주제의 이름이 128자 이하여야 합니다.
  • 이 글을 쓰는 시점에서는 압축된 데이터가 포함된 MSK 레코드를 Amazon Redshift에서 직접 쿼리할 수 없습니다. Amazon Redshift는 MSK 주제의 클라이언트 측 압축 데이터에 대한 기본 압축 해제 방법을 지원하지 않습니다.
  • 팔로우 모범 사례 MSK 클러스터를 설정하는 동안.
  • 스트리밍 수집 검토 한계 다른 고려 사항에 대해서는.

스트리밍 수집 설정

스트리밍 수집을 설정하려면 다음 단계를 완료하십시오.

  1. 설정 AWS 자격 증명 및 액세스 관리 (IAM) 스트리밍 수집에는 역할 및 신뢰 정책이 필요합니다. 지침은 다음을 참조하세요. IAM 설정 및 Kafka에서 스트리밍 수집 수행.
  2. 다음을 사용하여 데이터가 MSK 주제로 유입되는지 확인하세요. 아마존 클라우드 워치 통계 (예 : BytesOutPerSec).
  3. Amazon Redshift 콘솔에서 쿼리 편집기 v2를 실행하거나 원하는 SQL 클라이언트를 사용하여 다음 단계를 위해 Redshift 클러스터에 연결하세요. 다음 단계는 쿼리 편집기 v2에서 실행되었습니다.
  4. MSK 클러스터에 매핑할 외부 스키마를 만듭니다. 다음 명령문에서 IAM 역할 ARN과 MSK 클러스터 ARN을 바꿉니다.
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  'iam-role-arn' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn';
    

  5. 선택적으로 주제 이름이 대소문자를 구분하는 경우 활성화해야 합니다. enable_case_sensitive_identifier Amazon Redshift에서 액세스할 수 있습니다. 대소문자를 구분하는 식별자를 사용하려면 다음을 설정하십시오. enable_case_sensitive_identifier 세션, 사용자 또는 클러스터 수준에서 true로 설정합니다.
    SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;

  6. MSK 주제의 스트림 데이터를 사용하기 위해 구체화된 뷰를 생성합니다.
    CREATE MATERIALIZED VIEW Orders_Stream_MV AS
    SELECT kafka_partition, 
     kafka_offset, 
     refresh_time,
     JSON_PARSE(kafka_value) as Data
    FROM custschema."ORDERTOPIC"
    WHERE CAN_JSON_PARSE(kafka_value);
    

메타데이터 열 kafka_value Amazon MSK에서 도착하는 정보는 다음 위치에 저장됩니다. 바바이트 Amazon Redshift의 형식입니다. 이 게시물에서는 다음을 사용합니다. JSON_PARSE 변환하는 함수 kafka_value슈퍼 데이터 유형. 당신은 또한 CAN_JSON_PARSE 유효하지 않은 JSON 레코드를 건너뛰고 JSON 구문 분석 실패로 인한 오류로부터 보호하기 위해 필터 조건의 함수입니다. 이 게시물의 뒷부분에서 향후 디버깅을 위해 잘못된 데이터를 저장하는 방법에 대해 논의합니다.

  1. Amazon Redshift가 MSK 주제에서 읽고 구체화된 보기에 데이터를 로드하도록 트리거하는 스트리밍 구체화된 보기를 새로 고칩니다.
    REFRESH MATERIALIZED VIEW Orders_Stream_MV;

자동 새로 고침 기능을 사용하도록 스트리밍 구체화된 뷰를 설정할 수도 있습니다. 데이터가 스트림에 도착하면 구체화된 뷰가 자동으로 새로 고쳐집니다. 보다 구체화된 뷰 생성 자동 새로 고침을 사용하여 구체화된 뷰를 생성하는 방법에 대한 지침을 참조하세요.

JSON 문서 중첩 해제

다음은 스트리밍 구체화된 뷰의 MSK 주제에서 SUPER 유형의 Data 열로 수집된 JSON 문서의 샘플입니다. Orders_Stream_MV:

{
   "EventType":"Orders",
   "OrderID":"103",
   "CustomerID":"C104",
   "CustomerName":"David Smith",
   "OrderDate":"2023-09-02",
   "Store_Name":"Store-103",
   "ProductID":"P004",
   "ProductName":"Widget-X-003",
   "Quatity":"5",
   "Price":"2500",
   "OrderStatus":"Initiated"
}

다음 코드에 표시된 대로 점 표기법을 사용하여 JSON 페이로드를 중첩 해제합니다.

SELECT 
    data."OrderID"::INT4 as OrderID
    ,data."ProductID"::VARCHAR(36) as ProductID
    ,data."ProductName"::VARCHAR(36) as ProductName
    ,data."CustomerID"::VARCHAR(36) as CustomerID
    ,data."CustomerName"::VARCHAR(36) as CustomerName
    ,data."Store_Name"::VARCHAR(36) as Store_Name
    ,data."OrderDate"::TIMESTAMPTZ as OrderDate
    ,data."Quatity"::INT4 as Quatity
    ,data."Price"::DOUBLE PRECISION as Price
    ,data."OrderStatus"::VARCHAR(36) as OrderStatus
    ,"kafka_partition"::BIGINT  
    ,"kafka_offset"::BIGINT
FROM orders_stream_mv;

다음 스크린샷은 중첩 해제 후 결과가 어떻게 나타나는지 보여줍니다.

JSON 문서에 배열이 있는 경우 다음을 사용하여 데이터 중첩을 해제하는 것을 고려하세요. 파티QL Amazon Redshift의 문. 자세한 내용은 섹션을 참조하세요. JSON 문서 중첩 해제 게시물에 Amazon Kinesis Data Streams 및 Amazon DynamoDB와 함께 Amazon Redshift 스트리밍 수집을 사용한 거의 실시간 분석.

증분 데이터 로드 전략

증분 데이터 로드를 구현하려면 다음 단계를 완료하세요.

  1. 최종 사용자가 시각화 및 비즈니스 분석에 사용할 Orders라는 테이블을 Amazon Redshift에서 생성합니다.
    CREATE TABLE public.Orders (
        orderid integer ENCODE az64,
        productid character varying(36) ENCODE lzo,
        productname character varying(36) ENCODE lzo,
        customerid character varying(36) ENCODE lzo,
        customername character varying(36) ENCODE lzo,
        store_name character varying(36) ENCODE lzo,
        orderdate timestamp with time zone ENCODE az64,
        quatity integer ENCODE az64,
        price double precision ENCODE raw,
        orderstatus character varying(36) ENCODE lzo
    ) DISTSTYLE AUTO;
    

다음으로 다음이라는 저장 프로시저를 만듭니다. SP_Orders_Load 스트리밍 구체화된 뷰에서 CDC를 구현하고 최종 뷰로 로드 Orders 테이블. 당신은 다음의 조합을 사용합니다 Kafka_PartitionKafka_Offset CDC를 구현하기 위해 스트리밍 구체화된 뷰에서 시스템 열로 사용할 수 있습니다. 이 두 열의 조합은 MSK 주제 내에서 항상 고유하므로 프로세스 중에 누락된 기록이 없는지 확인합니다. 저장 프로시저에는 다음 구성 요소가 포함되어 있습니다.

  • 대소문자를 구분하는 식별자를 사용하려면 다음을 설정하십시오. enable_case_sensitive_identifier 세션, 사용자 또는 클러스터 수준에서 true로 설정됩니다.
  • 자동 새로 고침이 활성화되지 않은 경우 스트리밍 구체화된 뷰를 수동으로 새로 고칩니다.
  • 라는 감사 테이블을 만듭니다. Orders_Streaming_Audit 저장 프로시저를 마지막으로 실행하는 동안 Orders 테이블에 로드된 파티션의 마지막 오프셋을 추적하는 것이 존재하지 않는 경우.
  • 중첩을 해제하고 새 데이터 또는 변경된 데이터만 라는 스테이징 테이블에 삽입합니다. Orders_Staging_Table, 스트리밍 구체화된 뷰에서 읽기 Orders_Stream_MV어디로 Kafka_Offset 마지막으로 처리된 것보다 큼 Kafka_Offset 감사 테이블에 기록됨 Orders_Streaming_Audit 위한 Kafka_Partition 처리 중.
  • 이 저장 프로시저를 사용하여 처음으로 로드하는 경우에는 데이터가 없습니다. Orders_Streaming_Audit 테이블과 모든 데이터 Orders_Stream_MV Orders 테이블에 로드됩니다.
  • 사용자가 보는 페이지에는 비즈니스와 관련된 열만 삽입하세요. Orders 테이블, 준비 테이블에서 선택 Orders_Staging_Table.
  • 최대값을 삽입하세요. Kafka_Offset 로드된 모든 항목에 대해 Kafka_Partition 감사 테이블에 Orders_Streaming_Audit

중간 준비 테이블을 추가했습니다. Orders_Staging_Table 예상치 못한 실패 및 추적 가능성이 있는 경우 디버깅을 돕기 위해 이 솔루션을 사용합니다. 준비 단계를 건너뛰고 최종 테이블에 직접 로드 Orders_Stream_MV 사용 사례에 따라 더 낮은 대기 시간을 제공할 수 있습니다.

  1. 다음 코드를 사용하여 저장 프로시저를 만듭니다.
    CREATE OR REPLACE PROCEDURE SP_Orders_Load()
        AS $$
        BEGIN
    
        SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;
        REFRESH MATERIALIZED VIEW Orders_Stream_MV;
    
        --create an audit table if not exists to keep track of Max Offset per Partition that was loaded into Orders table  
    
        CREATE TABLE IF NOT EXISTS Orders_Streaming_Audit
        (
        "kafka_partition" BIGINT,
        "kafka_offset" BIGINT
        )
        SORTKEY("kafka_partition", "kafka_offset"); 
    
        DROP TABLE IF EXISTS Orders_Staging_Table;  
    
        --Insert only newly available data into staging table from streaming View based on the max offset for new/existing partitions
      --When loading for 1st time i.e. there is no data in Orders_Streaming_Audit table then all the data gets loaded from streaming View  
        CREATE TABLE Orders_Staging_Table as 
        SELECT 
        data."OrderID"."N"::INT4 as OrderID
        ,data."ProductID"."S"::VARCHAR(36) as ProductID
        ,data."ProductName"."S"::VARCHAR(36) as ProductName
        ,data."CustomerID"."S"::VARCHAR(36) as CustomerID
        ,data."CustomerName"."S"::VARCHAR(36) as CustomerName
        ,data."Store_Name"."S"::VARCHAR(36) as Store_Name
        ,data."OrderDate"."S"::TIMESTAMPTZ as OrderDate
        ,data."Quatity"."N"::INT4 as Quatity
        ,data."Price"."N"::DOUBLE PRECISION as Price
        ,data."OrderStatus"."S"::VARCHAR(36) as OrderStatus
        , s."kafka_partition"::BIGINT , s."kafka_offset"::BIGINT
        FROM Orders_Stream_MV s
        LEFT JOIN (
        SELECT
        "kafka_partition",
        MAX("kafka_offset") AS "kafka_offset"
        FROM Orders_Streaming_Audit
        GROUP BY "kafka_partition"
        ) AS m
        ON nvl(s."kafka_partition",0) = nvl(m."kafka_partition",0)
        WHERE
        m."kafka_offset" IS NULL OR
        s."kafka_offset" > m."kafka_offset";
    
        --Insert only business relevant column to final table selecting from staging table
        Insert into Orders 
        SELECT 
        OrderID
        ,ProductID
        ,ProductName
        ,CustomerID
        ,CustomerName
        ,Store_Name
        ,OrderDate
        ,Quatity
        ,Price
        ,OrderStatus
        FROM Orders_Staging_Table;
    
        --Insert the max kafka_offset for every loaded Kafka partitions into Audit table 
        INSERT INTO Orders_Streaming_Audit
        SELECT kafka_partition, MAX(kafka_offset)
        FROM Orders_Staging_Table
        GROUP BY kafka_partition;   
    
        END;
        $$ LANGUAGE plpgsql;
    

  2. 저장 프로시저를 실행하여 데이터를 로드합니다. Orders 표:
    call SP_Orders_Load();

  3. Orders 테이블의 데이터를 검증합니다.

교차 계정 스트리밍 수집 설정

MSK 클러스터가 다른 계정에 속하는 경우 다음 단계를 완료하여 IAM 역할을 생성하여 교차 계정 스트리밍 수집을 설정합니다. 다음 다이어그램과 같이 Redshift 클러스터가 계정 A에 있고 MSK 클러스터가 계정 B에 있다고 가정해 보겠습니다.

다음 단계를 완료하십시오.

  1. 계정 B에서 다음과 같은 IAM 역할을 생성합니다. MyRedshiftMSKRole Amazon Redshift(계정 A)가 다음 이름의 MSK 클러스터(계정 B)와 통신할 수 있도록 허용합니다. MyTestCluster. MSK 클러스터가 IAM 인증을 사용하는지, 아니면 인증되지 않은 액세스를 사용하는지에 따라 다음 정책 중 하나를 사용하여 IAM 역할을 생성해야 합니다.
    • IAM policAmazonAmazon 인증되지 않은 액세스를 사용하는 MSK:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }

    • IAM 인증 사용 시 Amazon MSK에 대한 IAM 정책:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKIAMpolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka-cluster:ReadData",
                      "kafka-cluster:DescribeTopic",
                      "kafka-cluster:Connect"
                  ],
                  "Resource": [
                      "arn:aws:kafka:us-east-1:0123456789:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1",
                      "arn:aws:kafka:us-east-1:0123456789:topic/MyTestCluster/*"
                  ]
              },
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }
      

이전 예의 리소스 섹션은 다음의 모든 주제에 대한 액세스를 제공합니다. MyTestCluster MSK 클러스터. IAM 역할을 특정 주제로 제한해야 하는 경우 주제 리소스를 더 제한적인 리소스 정책으로 바꿔야 합니다.

  1. 계정 B에서 IAM 역할을 생성한 후 IAM 역할 ARN(예: arn:aws:iam::0123456789:role/MyRedshiftMSKRole).
  2. 계정 A에서 다음과 같은 Redshift 사용자 지정 가능 IAM 역할을 생성합니다. MyRedshiftRole, Amazon MSK에 연결할 때 Amazon Redshift가 가정합니다. 역할에는 계정 A의 Amazon Redshift IAM 역할이 계정 B의 Amazon MSK 역할을 맡도록 허용하는 다음과 같은 정책이 있어야 합니다.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "RedshiftMSKAssumePolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::0123456789:role/MyRedshiftMSKRole"        
           }
        ]
    }
    

  3. Amazon Redshift IAM 역할에 대한 역할 ARN을 기록해 두십시오(예: arn:aws:iam::9876543210:role/MyRedshiftRole).
  4. 계정 B로 돌아가서 IAM 역할의 신뢰 정책에 이 역할을 추가하세요. arn:aws:iam::0123456789:role/MyRedshiftMSKRole 계정 B가 계정 A의 IAM 역할을 신뢰하도록 허용합니다. 신뢰 정책은 다음 코드와 유사해야 합니다.
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "sts:AssumeRole",
          "Principal": {
            "AWS": "arn:aws:iam::9876543210:role/MyRedshiftRole"
          }
        }
      ]
    } 
    

  5. Amazon Redshift 콘솔에 계정 A로 로그인합니다.
  6. 쿼리 편집기 v2 또는 원하는 SQL 클라이언트를 실행하고 다음 명령문을 실행하여 계정 B의 MSK 주제에 액세스합니다. MSK 클러스터에 매핑하려면 다음을 사용하여 외부 스키마를 생성합니다. 역할 연결 주위에 공백 없이 쉼표로 구분된 IAM 역할 ARN을 지정합니다. Redshift 클러스터에 연결된 역할은 체인에서 가장 먼저 옵니다.
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  
    'arn:aws:iam::9876543210:role/MyRedshiftRole,arn:aws:iam::0123456789:role/MyRedshiftMSKRole' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn'; --replace with ARN of MSK cluster 
    

성능 고려 사항

다음 성능 고려 사항을 염두에 두십시오.

  • 스트리밍 구체화된 뷰를 단순하게 유지하고 중첩 해제, 집계 및 사례 표현식과 같은 변환을 이후 단계로 이동합니다. 예를 들어 스트리밍 구체화된 뷰 위에 또 다른 구체화된 뷰를 생성합니다.
  • 특정 MSK 주제에 대해 단일 Redshift 클러스터 또는 작업 그룹에서 하나의 스트리밍 구체화된 뷰만 생성하는 것을 고려하십시오. MSK 주제당 여러 구체화된 보기를 생성하면 각 구체화된 보기가 해당 주제에 대한 소비자가 되고 해당 주제에 대한 Amazon MSK 대역폭을 공유하므로 수집 성능이 저하될 수 있습니다. 스트리밍 구체화된 뷰의 라이브 스트리밍 데이터는 다음을 사용하여 여러 Redshift 클러스터 또는 Redshift Serverless 작업 그룹에서 공유될 수 있습니다. 데이터 공유.
  • 스트리밍 구체화된 뷰를 정의하는 동안 다음을 사용하지 마세요. Json_Extract_Path_Text 데이터를 미리 파쇄해야 하기 때문입니다. Json_extract_path_text 데이터를 행 단위로 작동하므로 수집 처리량에 큰 영향을 미칩니다. 데이터를 스트림에서 그대로 가져온 다음 나중에 파쇄하는 것이 좋습니다.
  • 가능한 경우 스트리밍 구체화된 뷰에서 정렬 키를 건너뛰어 수집 속도를 높이는 것을 고려하세요. 스트리밍 구체화된 뷰에 정렬 키가 있으면 스트림에서 수집된 데이터의 모든 배치에 대해 정렬 작업이 발생합니다. 정렬에는 정렬 키 데이터 유형, 정렬 키 열 수, 각 배치에서 수집된 데이터 양에 따라 성능이 저하됩니다. 이 정렬 단계는 스트리밍 데이터를 쿼리할 수 있기 전에 대기 시간을 늘릴 수 있습니다. 수집 시 지연 시간과 데이터 쿼리 시 지연 시간 중 어느 것이 더 중요한지 평가해야 합니다.
  • 스트리밍 구체화된 뷰의 성능을 최적화하고 스토리지 사용량을 줄이기 위해 때때로 다음을 사용하여 구체화된 뷰에서 데이터를 제거합니다. 삭제, 자르다테이블 추가 변경.
  • 여러 MSK 주제를 Amazon Redshift에 병렬로 수집해야 하는 경우 더 적은 수의 스트리밍 구체화된 보기로 시작하고 더 많은 구체화된 보기를 계속 추가하여 클러스터 또는 작업 그룹 내의 전반적인 수집 성능을 평가합니다.
  • Redshift 프로비저닝된 클러스터의 노드 수 또는 Redshift Serverless 작업 그룹의 기본 RPU를 늘리면 스트리밍 구체화된 뷰의 수집 성능을 높이는 데 도움이 될 수 있습니다. 최적의 성능을 위해서는 MSK 주제에 있는 파티션 수만큼 Redshift 프로비저닝된 클러스터에 슬라이스를 두거나 MSK 주제에 있는 파티션 8개당 XNUMX RPU를 갖는 것을 목표로 해야 합니다.

모니터링 기술

수집 시 대상 구체화된 뷰 열의 크기를 초과하는 주제의 레코드는 건너뜁니다. 구체화된 뷰 새로 고침으로 건너뛴 레코드는 SYS_STREAM_SCAN_ERRORS 시스템 테이블.

계산이나 데이터 유형 변환 또는 구체화된 뷰 정의의 다른 논리로 인해 레코드를 처리할 때 발생하는 오류로 인해 문제가 있는 레코드가 주제에서 만료될 때까지 구체화된 뷰 새로 고침이 실패하게 됩니다. 이러한 유형의 문제를 방지하려면 구체화된 뷰 정의의 논리를 주의 깊게 테스트하십시오. 그렇지 않으면 레코드를 기본 VARBYTE 열에 배치하고 나중에 처리하십시오.

사용 가능한 모니터링 보기는 다음과 같습니다.

  • SYS_MV_REFRESH_HISTORY – 이 보기를 사용하여 스트리밍 구체화된 보기의 새로 고침 기록에 대한 정보를 수집합니다. 결과에는 수동 또는 자동과 같은 새로 고침 유형과 가장 최근 새로 고침 상태가 포함됩니다. 다음 쿼리는 스트리밍 구체화된 뷰의 새로 고침 기록을 보여줍니다.
    select mv_name, refresh_type, status, duration  from SYS_MV_REFRESH_HISTORY where mv_name='mv_store_sales'

  • SYS_STREAM_SCAN_ERRORS – 이 보기를 사용하여 MSK 주제에서 스트리밍 수집을 통해 레코드를 로드하지 못한 이유를 확인하세요. 이 게시물을 작성하는 시점에서 Amazon MSK에서 수집할 때 이 보기는 레코드가 구체화된 보기 열 크기보다 큰 경우에만 오류를 기록합니다. 이 보기에는 위치 열에 있는 MSK 레코드의 고유 식별자(오프셋)도 표시됩니다. 다음 쿼리는 레코드가 최대 크기 제한을 초과한 경우 오류 코드 및 오류 이유를 보여줍니다.
    select mv_name, external_schema_name, stream_name, record_time, query_id, partition_id, "position", error_code, error_reason
    from SYS_STREAM_SCAN_ERRORS  where mv_name='test_mv' and external_schema_name ='streaming_schema'	;
    

  • SYS_STREAM_SCAN_STATES – 이 보기를 사용하여 특정 Record_time에 스캔된 레코드 수를 모니터링합니다. 이 보기는 일괄 처리에서 읽은 마지막 레코드의 오프셋도 추적합니다. 다음 쿼리는 특정 구체화된 뷰에 대한 주제 데이터를 표시합니다.
    select mv_name,external_schema_name,stream_name,sum(scanned_rows) total_records,
    sum(scanned_bytes) total_bytes 
    from SYS_STREAM_SCAN_STATES where mv_name='test_mv' and external_schema_name ='streaming_schema' group by 1,2,3;
    

  • SYS_QUERY_HISTORY – 이 보기를 사용하여 스트리밍 구체화된 보기 새로 고침에 대한 전체 지표를 확인합니다. 이는 또한 표시되지 않는 오류에 대해 error_message 열에 오류를 기록합니다. SYS_STREAM_SCAN_ERRORS. 다음 쿼리는 스트리밍 구체화된 뷰의 새로 고침 실패를 일으키는 오류를 보여줍니다.
    select  query_id, query_type, status, query_text, error_message from sys_query_history where status='failed' and start_time>='2024-02-03 03:18:00' order by start_time desc

구현을 위한 추가 고려 사항

스트리밍 구체화된 뷰 위에 구체화된 뷰를 선택적으로 생성할 수 있으므로 최종 사용자를 위해 결과를 중첩 해제하고 미리 계산할 수 있습니다. 이 접근 방식을 사용하면 저장 프로시저를 사용하여 결과를 최종 테이블에 저장할 필요가 없습니다.

이 게시물에서는 CAN_JSON_PARSE 함수 데이터를 더 성공적으로 수집하기 위해 오류를 방지합니다. 이 경우 구문 분석할 수 없는 스트리밍 레코드는 Amazon Redshift에서 건너뜁니다. 그러나 오류 레코드를 추적하려면 스트리밍 구체화된 뷰를 생성할 때 다음 SQL을 사용하여 오류 레코드를 열에 저장하는 것이 좋습니다.

CREATE MATERIALIZED VIEW Orders_Stream_MV AS 
SELECT
kafka_partition, 
kafka_offset, 
refresh_time, 
JSON_PARSE(kafka_value) as Data 
case when CAN_JSON_PARSE(kafka_value) = true then json_parse(kafka_value) end Data,
case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end Invalid_Data
FROM custschema."ORDERTOPIC";

당신은 또한 고려할 수 있습니다 데이터 언로드 보기에서 SYS_STREAM_SCAN_ERRORS아마존 단순 스토리지 서비스 (Amazon S3) 버킷을 사용하여 알림을 받습니다. 이메일로 보고서 보내기 사용 아마존 단순 알림 서비스 (Amazon SNS) 새 S3 객체가 생성될 때마다 알림을 받습니다.

마지막으로 데이터 최신성 요구 사항에 따라 다음을 사용할 수 있습니다. 아마존 이벤트 브리지 앞서 언급한 호출을 위해 데이터 웨어하우스의 작업을 예약합니다. SP_Orders_Load 정기적으로 저장 프로시저를 사용합니다. EventBridge는 고정된 간격으로 이 작업을 수행하므로 메커니즘(예: AWS 단계 함수 상태 시스템) 프로시저에 대한 이전 호출이 완료되었는지 모니터링합니다. 자세한 내용은 다음을 참조하세요. 일정에 따라 실행되는 Amazon EventBridge 규칙 생성. 당신은 또한 참조 할 수 있습니다 AWS Step Functions 및 Amazon Redshift Data API를 사용하여 ELT 프로세스의 오케스트레이션 가속화. 또 다른 옵션은 다음을 사용하는 것입니다. Amazon Redshift 쿼리 편집기 v2 새로 고침을 예약합니다. 자세한 내용은 다음을 참조하세요. 쿼리 편집기 v2를 사용하여 쿼리 예약.

결론

이 게시물에서는 Amazon MSK와 함께 Amazon Redshift 스트리밍 수집을 사용하여 거의 실시간 분석을 구현하는 모범 사례에 대해 논의했습니다. 스트리밍 수집을 사용하여 MSK 주제의 데이터를 Amazon Redshift로 수집하는 예제 파이프라인을 보여 드렸습니다. 또한 Kafka Partition 및 Kafka Offset을 사용하여 Amazon Redshift에 증분 스트리밍 데이터 로드를 수행하는 안정적인 전략을 보여주었습니다. 또한 Amazon MSK에서 Amazon Redshift로의 교차 계정 스트리밍 수집을 구성하는 단계를 시연하고 최적화된 수집 속도를 위한 성능 고려 사항에 대해 논의했습니다. 마지막으로 Amazon Redshift 스트리밍 수집 오류를 추적하는 모니터링 기술에 대해 논의했습니다.

질문이 있으시면 댓글 섹션에 남겨주세요.


저자에 관하여

풀로미 다굽타 AWS의 수석 분석 솔루션 아키텍트입니다. 그녀는 고객이 비즈니스 문제를 해결하기 위해 클라우드 기반 분석 솔루션을 구축하도록 돕는 데 열정적입니다. 그녀는 일 외에는 여행을 하고 가족과 함께 시간을 보내는 것을 좋아합니다.

아데쿤레 아데도툰 Amazon Redshift 서비스의 선임 데이터베이스 엔지니어입니다. 그는 성능 튜닝에 중점을 두고 6년 동안 MPP 데이터베이스 작업을 해왔습니다. 그는 또한 새로운 서비스 기능과 기존 서비스 기능에 대해 개발 팀에 지침을 제공합니다.

spot_img

최신 인텔리전스

spot_img