제퍼넷 로고

AWS Glue와 Amazon MWAA를 결합하여 고급 VPC 선택 및 장애 조치 전략 구축 | 아마존 웹 서비스

시간

AWS 접착제 분석, 기계 학습(ML) 및 애플리케이션 개발을 위해 여러 소스의 데이터를 쉽게 검색, 준비, 이동 및 통합할 수 있게 해주는 서버리스 데이터 통합 ​​서비스입니다.

AWS Glue 고객은 엄격한 보안 요구 사항을 충족해야 하는 경우가 많습니다. 여기에는 작업에 허용되는 네트워크 연결을 잠그거나 다른 서비스에 액세스하기 위해 특정 VPC 내에서 실행하는 것이 포함됩니다. VPC 내부에서 실행하려면 작업을 단일 서브넷에 할당해야 하지만 가장 적합한 서브넷은 시간이 지남에 따라(예: 사용량 및 가용성에 따라) 변경될 수 있으므로 런타임에 결정을 내리는 것이 좋습니다. 자신만의 전략으로.

Apache Airflow용 Amazon 관리형 워크플로 (Amazon MWAA)는 관리형 Airflow 워크플로를 실행하는 AWS 서비스입니다. 이를 통해 AWS Glue 작업과 같은 작업이 실행되는 방식을 조정하는 사용자 지정 논리를 작성할 수 있습니다.

이 게시물에서는 런타임 시 작업에 할당된 VPC 서브넷을 동적으로 구성하여 선택하여 Airflow 워크플로의 일부로 AWS Glue 작업을 실행하는 방법을 보여줍니다.

솔루션 개요

VPC 내부에서 실행하려면 AWS Glue 작업에 최소한 네트워크 구성을 포함하는 연결을 할당해야 합니다. 모든 연결에서는 VPC, 서브넷 및 보안 그룹을 지정할 수 있지만 단순화를 위해 이 게시물에서는 네트워크 구성만 정의하고 외부 시스템을 포함하지 않는 NETWORK 유형의 연결을 사용합니다.

작업에 단일 연결로 할당된 고정 서브넷이 있는 경우, 가용 영역 또는 다른 이유로 서브넷을 사용할 수 없는 경우 작업을 실행할 수 없습니다. 또한 AWS Glue 작업의 각 노드(드라이버 또는 작업자)에는 서브넷에서 할당된 IP 주소가 필요합니다. 많은 대규모 작업을 동시에 실행하는 경우 IP 주소가 부족하여 작업이 의도한 것보다 적은 수의 노드로 실행되거나 전혀 실행되지 않을 수 있습니다.

AWS Glue ETL(추출, 변환 및 로드) 작업을 사용하면 여러 네트워크 구성으로 여러 연결을 지정할 수 있습니다. 그러나 작업은 항상 나열된 순서대로 연결의 네트워크 구성을 사용하려고 시도하고 해당 구성을 통과하는 첫 번째 구성을 선택합니다. 건강 수첩 작업을 시작하는 데 필요한 IP 주소가 두 개 이상 있으며 이는 최적의 옵션이 아닐 수 있습니다.

이 솔루션을 사용하면 연결 순서를 동적으로 재정렬하고 선택 우선순위를 정의하여 해당 동작을 향상하고 사용자 정의할 수 있습니다. 재시도가 필요한 경우 마지막 실행 이후 조건이 변경되었을 수 있으므로 전략에 따라 연결 우선 순위가 다시 지정됩니다.

결과적으로, 네트워크 보안 및 연결 요구 사항을 충족하는 동시에 서브넷 IP 주소 부족이나 중단으로 인해 작업이 실행되지 않거나 용량 부족으로 실행되는 것을 방지하는 데 도움이 됩니다.

다음 다이어그램은 솔루션 아키텍처를 보여줍니다.

사전 조건

게시물의 단계를 따르려면 로그인할 수 있는 사용자가 필요합니다. AWS 관리 콘솔 Amazon MWAA에 액세스할 수 있는 권한이 있습니다. 아마존 가상 프라이빗 클라우드 (Amazon VPC) 및 AWS Glue. 솔루션을 배포하기로 선택한 AWS 리전에는 VPC와 탄력적 IP 주소 2개를 생성할 수 있는 용량이 필요합니다. 두 리소스 유형 모두에 대한 기본 지역 할당량은 5개이므로 콘솔을 통해 증가를 요청해야 할 수도 있습니다.

당신은 또한 필요합니다 AWS 자격 증명 및 액세스 관리 (IAM) 역할이 아직 없는 경우 AWS Glue 작업을 실행하는 데 적합합니다. 지침은 다음을 참조하세요. AWS Glue에 대한 IAM 역할 생성.

Airflow 환경 및 VPC 배포

먼저 2개의 퍼블릭 서브넷과 2개의 프라이빗 서브넷이 있는 새 VPC 생성을 포함하여 새로운 Airflow 환경을 배포합니다. 이는 Amazon MWAA에 가용 영역 장애 허용이 필요하기 때문에 해당 지역의 서로 다른 두 가용 영역에 있는 두 개의 서브넷에서 실행되어야 하기 때문입니다. 퍼블릭 서브넷은 NAT 게이트웨이가 프라이빗 서브넷에 대한 인터넷 액세스를 제공할 수 있도록 사용됩니다.

다음 단계를 완료하십시오.

  1. 를 생성 AWS 클라우드 포메이션 다음에서 템플릿을 복사하여 컴퓨터에 저장하세요. 빠른 시작 가이드 로컬 텍스트 파일로.
  2. AWS CloudFormation 콘솔에서 스택 탐색 창에서
  3. 왼쪽 메뉴에서 스택 생성 옵션으로 새로운 리소스 사용(표준).
  4. 왼쪽 메뉴에서 템플릿 파일 업로드 로컬 템플릿 파일을 선택합니다.
  5. 왼쪽 메뉴에서 다음 보기.
  6. 설정 단계를 완료하고 환경 이름을 입력하고 나머지 매개변수는 기본값으로 둡니다.
  7. 마지막 단계에서 리소스가 생성될 것임을 확인하고 선택합니다. 문의하기.

스택 상태가 다음으로 변경될 때까지 생성에는 20~30분이 걸릴 수 있습니다. CREATE_COMPLETE.

가장 많은 시간이 소요되는 리소스는 Airflow 환경입니다. 생성되는 동안 Airflow UI를 열어야 할 때까지 다음 단계를 계속할 수 있습니다.

  1. 스택에서 자료 탭에서 VPC와 2개의 프라이빗 서브넷의 ID(PrivateSubnet1PrivateSubnet2) 다음 단계에서 사용합니다.

AWS Glue 연결 생성

CloudFormation 템플릿은 2개의 프라이빗 서브넷을 배포합니다. 이 단계에서는 AWS Glue 작업이 실행될 수 있도록 각각에 대한 AWS Glue 연결을 생성합니다. Amazon MWAA는 최근 공유 VPC에서 Airflow 클러스터를 실행할 수 있는 용량을 추가하여 비용을 절감하고 네트워크 관리를 단순화했습니다. 자세한 내용은 다음을 참조하세요. Amazon MWAA의 공유 VPC 지원 소개.

연결을 생성하려면 다음 단계를 완료하십시오.

  1. AWS Glue 콘솔에서 데이터 연결 탐색 창에서
  2. 왼쪽 메뉴에서 연결 만들기.
  3. 왼쪽 메뉴에서 네트워크 데이터 소스로.
  4. VPC와 프라이빗 서브넷(PrivateSubnet1) CloudFormation 스택에 의해 생성됩니다.
  5. 기본 보안 그룹을 사용하십시오.
  6. 왼쪽 메뉴에서 다음 보기.
  7. 연결 이름으로 다음을 입력합니다. MWAA-Glue-Blog-Subnet1.
  8. 세부정보를 검토하고 생성을 완료합니다.
  9. 다음을 사용하여 이 단계를 반복합니다. PrivateSubnet2 연결 이름을 지정하고 MWAA-Glue-Blog-Subnet2.

AWS Glue 작업 생성

이제 나중에 Airflow 워크플로에 의해 트리거될 AWS Glue 작업을 생성합니다. 작업은 이전 섹션에서 생성된 연결을 사용하지만 일반적으로 작업에 직접 할당하는 대신 이 시나리오에서는 작업 연결 목록을 비워 두고 워크플로가 런타임에 사용할 연결을 결정하도록 합니다.

이 경우 작업 스크립트는 중요하지 않으며 단지 연결에 따라 서브넷 중 하나에서 실행된 작업을 보여주기 위한 것입니다.

  1. AWS Glue 콘솔에서 ETL 작업 탐색 창에서 다음을 선택합니다. 스크립트 편집기.
  2. 기본 옵션(Spark 엔진 및 신선한 시작) 및 선택 스크립트 만들기.
  3. 자리 표시자 스크립트를 다음 Python 코드로 바꿉니다.
    import ipaddress
    import socket
    
    subnets = {
        "PrivateSubnet1": "10.192.20.0/24",
        "PrivateSubnet2": "10.192.21.0/24"
    }
    
    ip = socket.gethostbyname(socket.gethostname())
    subnet_name = "unknown"
    for subnet, cidr in subnets.items():
        if ipaddress.ip_address(ip) in ipaddress.ip_network(cidr):
            subnet_name = subnet
    
    print(f"The driver node has been assigned the ip: {ip}"
          + f" which belongs to the subnet: {subnet_name}")
    

  4. 작업 이름을 다음으로 바꿉니다. AirflowBlogJob.
  5. 직업 세부 정보 탭, IAM 역할, 역할을 선택하고 작업자 수로 2를 입력합니다(절약을 위해).
  6. 작업이 생성되도록 이러한 변경 사항을 저장합니다.

Airflow 환경 역할에 AWS Glue 권한 부여

CloudFormation 템플릿을 통해 Airflow에 대해 생성된 역할은 워크플로를 실행하기 위한 기본 권한을 제공하지만 AWS Glue와 같은 다른 서비스와 상호 작용할 수는 없습니다. 프로덕션 프로젝트에서는 이러한 추가 권한을 사용하여 자체 템플릿을 정의하지만, 이 게시물에서는 단순화를 위해 추가 권한을 인라인 정책으로 추가합니다. 다음 단계를 완료하세요.

  1. IAM 콘솔에서 역할 탐색 창에서
  2. 템플릿에서 생성된 역할을 찾습니다. CloudFormation 스택에 할당한 이름으로 시작한 다음 -MwaaExecutionRole-.
  3. 역할 세부정보 페이지에서 권한 추가 메뉴, 선택 인라인 정책 만들기.
  4. 시각적 모드에서 JSON 모드로 전환하고 텍스트 상자에 다음 JSON을 입력합니다. 귀하가 보유한 AWS Glue 역할은 다음으로 시작하는 규칙을 따른다고 가정합니다. AWSGlueServiceRole. 보안 강화를 위해 다음에서 와일드카드 리소스를 바꿀 수 있습니다. ec2:DescribeSubnets CloudFormation 스택에 있는 두 프라이빗 서브넷의 ARN에 대한 권한입니다.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:GetConnection"
                ],
                "Resource": [
                    "arn:aws:glue:*:*:connection/MWAA-Glue-Blog-Subnet*",
                    "arn:aws:glue:*:*:catalog"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "glue:UpdateJob",
                    "glue:GetJob",
                    "glue:StartJobRun",
                    "glue:GetJobRun"
                ],
                "Resource": [
                    "arn:aws:glue:*:*:job/AirflowBlogJob",
                    "arn:aws:glue:*:*:job/BlogAirflow"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "ec2:DescribeSubnets"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:GetRole",
                    "iam:PassRole"
                ],
                "Resource": "arn:aws:iam::*:role/service-role/AWSGlueServiceRole*"
            }
        ]
    }
    

  5. 왼쪽 메뉴에서 다음 보기.
  6. 엔터 버튼 GlueRelatedPermissions 정책 이름으로 입력하고 생성을 완료합니다.

이 예에서는 ETL 스크립트 작업을 사용합니다. 시각적 작업의 경우 저장 시 자동으로 스크립트를 생성하므로 Airflow 역할에는 구성된 스크립트 경로에 쓸 수 있는 권한이 필요합니다. 아마존 단순 스토리지 서비스 (아마존 S3).

Airflow DAG 만들기

Airflow 워크플로는 관련된 다양한 작업과 상호 종속성을 프로그래밍 방식으로 지정하는 Python 파일로 정의되는 DAG(방향성 비순환 그래프)를 기반으로 합니다. DAG를 만들려면 다음 스크립트를 완료하세요.

  1. 이름이 지정된 로컬 파일 만들기 glue_job_dag.py 텍스트 편집기를 사용하여.

다음 각 단계에서는 파일에 입력할 수 있는 코드 조각과 그 기능에 대한 설명을 제공합니다.

  1. 다음 코드 조각은 필수 Python 모듈 가져오기를 추가합니다. 모듈은 이미 Airflow에 설치되어 있습니다. 그렇지 않은 경우에는 다음을 사용해야 합니다. requirements.txt 설치할 모듈을 Airflow에 나타내는 파일입니다. 또한 나중에 코드에서 사용할 Boto3 클라이언트도 정의합니다. 기본적으로 Airflow와 동일한 역할 및 지역을 사용하므로 필요한 추가 권한을 사용하여 역할을 설정하기 전에 설정해야 합니다.
    import boto3
    from pendulum import datetime, duration
    from random import shuffle
    from airflow import DAG
    from airflow.decorators import dag, task
    from airflow.models import Variable
    from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
    
    glue_client = boto3.client('glue')
    ec2 = boto3.client('ec2')
    

  2. 다음 코드 조각은 연결 순서 전략을 구현하는 세 가지 기능을 추가합니다. 이 전략은 우선 순위를 설정하기 위해 주어진 연결을 재정렬하는 방법을 정의합니다. 이것은 단지 예일 뿐입니다. 필요에 따라 사용자 정의 코드를 작성하여 고유한 논리를 구현할 수 있습니다. 코드는 먼저 각 연결 서브넷에서 사용 가능한 IP를 확인하고 전체 용량으로 작업을 실행하는 데 사용할 수 있는 충분한 IP가 있는 IP와 작업에 필요한 최소 IP가 두 개 이상 사용 가능하기 때문에 사용할 수 있는 IP를 구분합니다. 시작. 전략이 설정되어 있다면 random, 이전에 설명한 각 연결 그룹 내에서 순서를 무작위로 지정하고 다른 연결을 추가합니다. 전략이라면 capacity, 대부분의 무료 IP부터 가장 적은 IP 순으로 순서를 지정합니다.
    def get_available_ips_from_connection(glue_connection_name):
        conn_response = glue_client.get_connection(Name=glue_connection_name)
        connection_properties = conn_response['Connection']['PhysicalConnectionRequirements']
        subnet_id = connection_properties['SubnetId']
        subnet_response = ec2.describe_subnets(SubnetIds=[subnet_id])
        return subnet_response['Subnets'][0]['AvailableIpAddressCount']
    
    def get_connections_free_ips(glue_connection_names, num_workers):
        good_connections = []
        usable_connections = []    
        for connection_name in glue_connection_names:
            try:
                available_ips = get_available_ips_from_connection(connection_name)
                # Priority to connections that can hold the full cluster and we haven't just tried
                if available_ips >= num_workers:
                    good_connections.append((connection_name, available_ips))
                elif available_ips >= 2: # The bare minimum to start a Glue job
                    usable_connections.append((connection_name, available_ips))                
            except Exception as e:
                print(f"[WARNING] Failed to check the free ips for:{connection_name}, will skip. Exception: {e}")  
        return good_connections, usable_connections
    
    def prioritize_connections(connection_list, num_workers, strategy):
        (good_connections, usable_connections) = get_connections_free_ips(connection_list, num_workers)
        print(f"Good connections: {good_connections}")
        print(f"Usable connections: {usable_connections}")
        all_conn = []
        if strategy=="random":
            shuffle(good_connections)
            shuffle(usable_connections)
            # Good connections have priority
            all_conn = good_connections + usable_connections
        elif strategy=="capacity":
            # We can sort both at the same time
            all_conn = good_connections + usable_connections
            all_conn.sort(key=lambda x: -x[1])
        else: 
            raise ValueError(f"Unknown strategy specified: {strategy}")    
        result = [c[0] for c in all_conn] # Just need the name
        # Keep at the end any other connections that could not be checked for ips
        result += [c for c in connection_list if c not in result]
        return result
    

  3. 다음 코드는 전략에 정의된 연결 순서로 작업을 업데이트하고 실행하고 결과를 기다리는 작업 실행 작업을 사용하여 DAG 자체를 생성합니다. 작업 이름, 연결, 전략은 Airflow 변수에서 가져오므로 쉽게 구성하고 업데이트할 수 있습니다. 지수 백오프가 구성된 두 번의 재시도가 있으므로 작업이 실패하면 연결 선택을 포함한 전체 작업을 반복합니다. 아마도 이제 최선의 선택은 다른 연결일 수도 있고, 이전에 무작위로 선택한 서브넷이 현재 중단된 가용 영역에 있고, 다른 연결을 선택하여 복구할 수도 있습니다.
    with DAG(
        dag_id='glue_job_dag',
        schedule_interval=None, # Run on demand only
        start_date=datetime(2000, 1, 1), # A start date is required
        max_active_runs=1,
        catchup=False
    ) as glue_dag:
        
        @task(
            task_id="glue_task", 
            retries=2,
            retry_delay=duration(seconds = 30),
            retry_exponential_backoff=True
        )
        def run_job_task(**ctx):    
            glue_connections = Variable.get("glue_job_dag.glue_connections").strip().split(',')
            glue_jobname = Variable.get("glue_job_dag.glue_job_name").strip()
            strategy= Variable.get('glue_job_dag.strategy', 'random') # random or capacity
            print(f"Connections available: {glue_connections}")
            print(f"Glue job name: {glue_jobname}")
            print(f"Strategy to use: {strategy}")
            job_props = glue_client.get_job(JobName=glue_jobname)['Job']            
            num_workers = job_props['NumberOfWorkers']
            
            glue_connections = prioritize_connections(glue_connections, num_workers, strategy)
            print(f"Running Glue job with the connection order: {glue_connections}")
            existing_connections = job_props.get('Connections',{}).get('Connections', [])
            # Preserve other connections that we don't manage
            other_connections = [con for con in existing_connections if con not in glue_connections]
            job_props['Connections'] = {"Connections": glue_connections + other_connections}
            # Clean up properties so we can reuse the dict for the update request
            for prop_name in ['Name', 'CreatedOn', 'LastModifiedOn', 'AllocatedCapacity', 'MaxCapacity']:
                del job_props[prop_name]
    
            GlueJobOperator(
                task_id='submit_job',
                job_name=glue_jobname,
                iam_role_name=job_props['Role'].split('/')[-1],
                update_config=True,
                create_job_kwargs=job_props,
                wait_for_completion=True
            ).execute(ctx)   
            
        run_job_task()
    

Airflow 워크플로 만들기

이제 방금 생성한 AWS Glue 작업을 호출하는 워크플로를 생성합니다.

  1. Amazon S3 콘솔에서 CloudFormation 템플릿으로 생성된 버킷을 찾습니다. 이 버킷의 이름은 스택 이름으로 시작하고 -environmentbucket- (예 : myairflowstack-environmentbucket-ap1qks3nvvr4).
  2. 해당 버킷 안에 다음과 같은 폴더를 만듭니다. dags, 해당 폴더 안에 DAG 파일을 업로드합니다. glue_job_dag.py 이전 섹션에서 만든 것입니다.
  3. Amazon MWAA 콘솔에서 CloudFormation 스택을 사용하여 배포한 환경으로 이동합니다.

아직 상태가 아닌 경우 유효한, 해당 상태에 도달할 때까지 기다리십시오. CloudFormation 스택을 배포한 후 30분 이상 걸리지 않습니다.

  1. 환경 세부 정보를 보려면 표에서 환경 링크를 선택하세요.

이전 단계에서 사용한 버킷 및 폴더에서 DAG를 선택하도록 구성되었습니다. Airflow는 해당 폴더의 변경 사항을 모니터링합니다.

  1. 왼쪽 메뉴에서 Airflow UI 열기 Airflow UI에 액세스하는 새 탭을 열고 통합 IAM 보안을 사용하여 로그인합니다.

생성한 DAG 파일에 문제가 있는 경우 페이지 상단에 영향을 받은 줄을 나타내는 오류가 표시됩니다. 이 경우 단계를 검토한 후 다시 업로드하세요. 몇 초 후에 이를 구문 분석하고 오류 배너를 업데이트하거나 제거합니다.

  1. 관리자 메뉴, 선택 변수.
  2. 다음 키와 값을 사용하여 세 개의 변수를 추가합니다.
    1. glue_job_dag.glue_connections 가치있는 MWAA-Glue-Blog-Subnet1,MWAA-Glue-Blog-Subnet2.
    2. glue_job_dag.glue_job_name 가치있는 AirflowBlogJob.
    3. glue_job_dag.strategy 가치있는 capacity.

동적 서브넷 할당으로 작업 실행

이제 워크플로를 실행하고 연결 순서를 동적으로 재정렬하는 전략을 확인할 준비가 되었습니다.

  1. Airflow UI에서 다음을 선택합니다. DAG, 그리고 행에 glue_job_dag에서 재생 아이콘을 선택하세요.
  2. 검색 메뉴, 선택 태스크 인스턴스.
  3. 인스턴스 테이블에서 오른쪽으로 스크롤하여 Log Url 그 위에 있는 아이콘을 선택하여 로그를 엽니다.

작업이 실행되면 로그가 업데이트됩니다. "연결 순서로 Glue 작업 실행 중:"으로 시작하는 줄과 연결 IP 및 할당된 범주의 세부 정보를 표시하는 이전 줄을 찾을 수 있습니다. 오류가 발생하면 이 로그에 세부정보가 표시됩니다.

  1. AWS Glue 콘솔에서 ETL 작업 탐색 창에서 작업을 선택하세요. AirflowBlogJob.
  2. 실행 탭에서 실행 인스턴스를 선택한 다음 출력 로그 링크를 클릭하면 새 탭이 열립니다.
  3. 새 탭에서 로그 스트림 링크를 사용하여 엽니다.

드라이버가 할당된 IP와 드라이버가 속한 서브넷이 표시됩니다. 이는 Airflow에 표시된 연결과 일치해야 합니다(로그가 표시되지 않으면 다음을 선택하세요). 이력서 따라서 사용 가능한 즉시 업데이트됩니다.)

  1. Airflow UI에서 Airflow 변수를 편집합니다. glue_job_dag.strategy 로 설정 random.
  2. DAG를 여러 번 실행하고 순서가 어떻게 변경되는지 확인하세요.

정리

배포가 더 이상 필요하지 않은 경우 리소스를 삭제하여 추가 비용이 발생하지 않도록 하세요.

  1. 업로드한 Python 스크립트를 삭제하면 다음 단계에서 S3 버킷이 자동으로 삭제될 수 있습니다.
  2. CloudFormation 스택을 삭제합니다.
  3. AWS Glue 작업을 삭제합니다.
  4. 작업이 Amazon S3에 저장한 스크립트를 삭제합니다.
  5. 이 게시물의 일부로 생성한 연결을 삭제하세요.

결론

이 게시물에서는 AWS Glue와 Amazon MWAA가 어떻게 협력하여 운영 및 관리 오버헤드를 최소화하면서 더욱 발전된 사용자 지정 워크플로를 구축할 수 있는지 보여주었습니다. 이 솔루션을 사용하면 특별한 운영, 네트워크 또는 보안 요구 사항을 충족하기 위해 AWS Glue 작업이 실행되는 방식을 더 효과적으로 제어할 수 있습니다.

다음과 같은 다양한 방법으로 자체 Amazon MWAA 환경을 배포할 수 있습니다. 이 템플릿 이 게시물, Amazon MWAA 콘솔 또는 AWS CLI. 또한 네트워크 아키텍처 및 요구 사항에 따라 AWS Glue 작업을 조정하는 자체 전략을 구현할 수도 있습니다(예: 가능한 경우 데이터에 더 가까운 작업 실행).


저자 소개

마이클 그린스타인 공공 부문의 분석 전문가 솔루션 설계자입니다.

곤잘로 헤레 로스 AWS Glue 팀의 수석 빅 데이터 설계자입니다.

spot_img

VC 카페

최신 인텔리전스

라이프사이VC

VC 카페

spot_img