제퍼넷 로고

Apache Airflow 버전 2.8.1에 대한 Amazon MWAA 지원 소개 | 아마존 웹 서비스

시간

Apache Airflow용 Amazon 관리형 워크플로 (Amazon MWAA)는 다음을 위한 관리형 오케스트레이션 서비스입니다. 아파치 에어 플로우 따라서 클라우드에서 엔드투엔드 데이터 파이프라인을 간단하게 설정하고 운영할 수 있습니다.

조직에서는 Amazon MWAA를 사용하여 비즈니스 워크플로를 향상합니다. 예를 들어, C2i 유전체학 데이터 플랫폼에서 Amazon MWAA를 사용하여 수십억 개의 레코드에 있는 암 유전체학 데이터를 처리하는 알고리즘의 검증을 조율합니다. 씰룩 씰룩 움직이다라이브 스트리밍 플랫폼인 은 140억 XNUMX천만 명 이상의 활성 사용자를 대상으로 추천 모델의 교육 및 배포를 관리하고 조정합니다. 그들은 Amazon MWAA를 사용하여 확장하는 동시에 보안을 크게 향상하고 인프라 관리 오버헤드를 줄입니다.

오늘 우리는 Amazon MWAA에서 Apache Airflow 버전 2.8.1 환경을 사용할 수 있음을 발표합니다. 이 게시물에서는 이제 Amazon MWAA에서 사용할 수 있는 Airflow의 새로운 기능 중 일부와 Amazon MWAA 환경을 설정하거나 버전 2.8.1로 업그레이드하는 방법을 안내합니다.

개체 저장소

데이터 파이프라인이 확장됨에 따라 엔지니어는 고유한 API, 인증 방법, 데이터 액세스 규칙을 사용하여 여러 시스템에서 스토리지를 관리하는 데 어려움을 겪으며, 이에 따라 맞춤형 로직과 스토리지별 연산자가 필요합니다. 이제 Airflow는 이러한 세부 정보를 처리하는 통합 객체 스토리지 추상화 계층을 제공하므로 엔지니어는 데이터 파이프라인에 집중할 수 있습니다. Airflow 객체 스토리지 사용 fsspec 다양한 객체 스토리지 시스템 전반에 걸쳐 일관된 데이터 액세스 코드를 활성화하여 인프라 복잡성을 간소화합니다.

다음은 이 기능의 주요 이점 중 일부입니다.

  • 휴대용 워크플로 – DAG(방향성 비순환 그래프)를 최소한으로 변경하여 스토리지 서비스를 전환할 수 있습니다.
  • 효율적인 데이터 전송 – 메모리에 로드하는 대신 데이터를 스트리밍할 수 있습니다.
  • 유지 보수 감소 – 별도의 연산자가 필요하지 않으므로 파이프라인을 유지 관리하기가 쉽습니다.
  • 익숙한 프로그래밍 경험 – 다음과 같은 Python 모듈을 사용할 수 있습니다. 셔틀, 파일 작업용

객체 스토리지를 사용하려면 아마존 단순 스토리지 서비스 (Amazon S3), 당신은 패키지를 추가로 설치 Amazon 공급자의 s3fs(apache-airflow-providers-amazon[s3fs]==x.x.x).

아래 샘플 코드에서는 데이터를 직접 이동하는 방법을 볼 수 있습니다. Google 클라우드 저장소 아마존 S3로. Airflow의 객체 스토리지는 shutil.copyfileobj, 객체의 데이터는 다음에서 청크로 읽혀집니다. gcs_data_source 그리고 스트리밍된 amazon_s3_data_target.

gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")

amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")

with DAG(
    dag_id="copy_from_gcs_to_amazon_s3",
    start_date=datetime(2024, 2, 26),
    schedule="0 0 * * *",
    catchup=False,    
    tags=["2.8", "ObjectStorage"],
) as dag:

    def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:
        objects = [f for f in path.iterdir() if f.is_file()]
        return objects

    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):    
        object.copy(dst=path)

    objects_list = list_objects(path=gcs_data_source)
    copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)

Airflow 객체 스토리지에 대한 자세한 내용은 다음을 참조하세요. 객체 저장소.

엑스컴 UI

XCom(교차 통신)은 작업 간 데이터 전달을 허용하여 작업 간 통신 및 조정을 촉진합니다. 이전에는 개발자가 작업과 관련된 XCom을 보려면 다른 보기로 전환해야 했습니다. Airflow 2.8을 사용하면 XCom 키-값이 다음 스크린샷과 같이 Airflow 그리드 보기 내의 탭에 직접 렌더링됩니다.

새로운 엑스컴 탭은 다음과 같은 이점을 제공합니다.

  • 향상된 XCom 가시성 – UI의 전용 탭은 DAG 또는 작업과 관련된 모든 XCom을 볼 수 있는 편리하고 사용자 친화적인 방법을 제공합니다.
  • 향상된 디버깅 – UI에서 직접 XCom 값을 볼 수 있으면 DAG 디버깅에 도움이 됩니다. Python 코드를 사용하여 수동으로 가져오고 검사할 필요 없이 업스트림 작업의 출력을 빠르게 확인할 수 있습니다.

작업 컨텍스트 로거

Airflow에서 데이터 파이프라인이 원활하게 작동하려면 작업 수명 주기를 관리하는 것이 중요합니다. 그러나 특히 작업이 예기치 않게 중지되는 시나리오에서 특정 문제가 지속되었습니다. 이는 스케줄러 시간 초과, 좀비 작업(하트비트를 보내지 않고 실행 상태로 유지되는 작업) 또는 작업자의 메모리가 부족한 인스턴스입니다.

전통적으로 이러한 실패, 특히 스케줄러나 실행기와 같은 핵심 Airflow 구성요소에 의해 트리거된 실패는 작업 로그에 기록되지 않았습니다. 이러한 제한으로 인해 사용자는 Airflow UI 외부에서 문제를 해결해야 했으며, 이로 인해 문제를 정확히 찾아내고 해결하는 프로세스가 복잡해졌습니다.

Airflow 2.8에서는 이 문제를 해결하는 중요한 개선 사항을 도입했습니다. 스케줄러 및 실행기를 포함한 Airflow 구성요소는 이제 새로운 기능을 사용할 수 있습니다. TaskContextLogger 오류 메시지를 작업 로그에 직접 전달합니다. 이 기능을 사용하면 작업 실행과 관련된 모든 관련 오류 메시지를 한 곳에서 볼 수 있습니다. 이는 작업이 실패한 이유를 파악하는 프로세스를 단순화하고 단일 로그 보기 내에서 무엇이 잘못되었는지에 대한 완전한 관점을 제공합니다.

다음 스크린샷은 작업이 어떻게 감지되는지 보여줍니다. zombie, 스케줄러 로그는 작업 로그의 일부로 포함됩니다.

환경 구성 매개변수를 설정해야 합니다. enable_task_context_loggerTrue, 기능을 활성화합니다. 활성화되면 Airflow는 스케줄러, 실행자 또는 콜백 실행 컨텍스트의 로그를 작업 로그로 전달하고 이를 Airflow UI에서 사용할 수 있도록 할 수 있습니다.

데이터세트용 리스너 후크

데이터 세트 Airflow 2.4에서는 DAG 간의 데이터 인식 스케줄링 및 종속성을 생성하기 위한 데이터 소스의 논리적 그룹화로 도입되었습니다. 예를 들어 생산자 DAG가 데이터 세트를 업데이트할 때 소비자 DAG가 실행되도록 예약할 수 있습니다. 청취자 Airflow 사용자가 환경에서 발생하는 특정 이벤트에 대한 구독을 생성할 수 있도록 합니다. Airflow 2.8에서는 두 가지 데이터 세트 이벤트에 대한 리스너가 추가되었습니다. on_dataset_createdon_dataset_changed, Airflow 사용자가 데이터 세트 관리 작업에 대응하는 커스텀 코드를 효과적으로 작성할 수 있습니다. 예를 들어 외부 시스템을 트리거하거나 알림을 보낼 수 있습니다.

데이터세트에 리스너 후크를 사용하는 것은 간단합니다. 리스너를 생성하려면 다음 단계를 완료하세요. on_dataset_changed:

  1. 리스너 생성(dataset_listener.py):
    from airflow import Dataset
    from airflow.listeners import hookimpl
    
    @hookimpl
    def on_dataset_changed(dataset: Dataset):
        """Following custom code is executed when a dataset is changed."""
        print("Invoking external endpoint")
    
        """Validating a specific dataset"""
        if dataset.uri == "s3://bucket-prefix/object-key.ext":
            print ("Execute specific/different action for this dataset")

  2. Airflow 환경에 리스너를 등록하는 플러그인을 만듭니다(dataset_listener_plugin.py):
    from airflow.plugins_manager import AirflowPlugin
    from plugins import listener_code
    
    class DatasetListenerPlugin(AirflowPlugin):
        name = "dataset_listener_plugin"
        listeners = [dataset_listener]

Amazon MWAA에 플러그인을 설치하는 방법에 대한 자세한 내용은 다음을 참조하십시오. 사용자 정의 플러그인 설치.

Amazon MWAA에서 새로운 Airflow 2.8.1 환경 설정

당신은 설치 귀하의 계정과 선호하는 지역에서 AWS 관리 콘솔, API 또는 AWS 명령 줄 인터페이스 (AWS CLI). IaC(코드형 인프라)를 채택하는 경우 다음을 사용하여 설정을 자동화할 수 있습니다. AWS 클라우드 포메이션Walk Through California 프로그램, AWS 클라우드 개발 키트 (AWS CDK) 또는 Terraform 스크립트.

Amazon MWAA에서 Airflow 버전 2.8.1 환경이 성공적으로 생성되면 특정 패키지가 스케줄러 및 작업자 노드에 자동으로 설치됩니다. 설치된 패키지 및 해당 버전의 전체 목록은 다음을 참조하세요. Amazon MWAA 환경에 설치된 Apache Airflow 공급자 패키지. 요구 사항 파일을 사용하여 추가 패키지를 설치할 수 있습니다.

이전 버전의 Airflow에서 버전 2.8.1로 업그레이드

전체 버전 업그레이드를 사용하여 이전 Airflow 버전 2.x 기반 환경을 버전 2.8.1로 업그레이드하면 이러한 최신 기능을 활용할 수 있습니다. 전체 버전 업그레이드에 대한 자세한 내용은 다음을 참조하세요. Apache Airflow 버전 업그레이드 or Amazon MWAA를 통한 인플레이스 버전 업그레이드 소개.

결론

이 게시물에서는 객체 스토리지, 그리드 보기에 추가된 새로운 XCom 탭, 작업 컨텍스트 로깅, 데이터세트용 리스너 후크 및 이를 사용하는 방법과 같이 Airflow 버전 2.8에 도입된 몇 가지 중요한 기능에 대해 논의했습니다. 또한 Amazon MWAA의 구현을 보여주기 위해 몇 가지 샘플 코드도 제공했습니다. 전체 변경 목록은 다음을 참조하세요. Airflow의 출시 노트.

Amazon MWAA에 대한 추가 세부 정보 및 코드 예제는 다음을 방문하십시오. Amazon MWAA 사용 설명서 그리고 Amazon MWAA 예제 GitHub 리포지토리.

Apache, Apache Airflow 및 Airflow는 다음의 등록 상표 또는 상표입니다. Apache Software Foundation 미국 및 / 또는 기타 국가에서.


저자에 관하여

만시 부타다 네덜란드에 본사를 둔 ISV 솔루션 설계자입니다. 그녀는 고객이 비즈니스 문제를 해결하는 잘 설계된 AWS 솔루션을 설계하고 구현하도록 돕습니다. 그녀는 데이터 분석과 네트워킹에 열정을 갖고 있습니다. 일 외에도 그녀는 음식 실험, 피클볼 게임, 재미있는 보드 게임에 빠져드는 것을 즐깁니다.

에르난 가르시아 네덜란드에 본사를 둔 AWS의 수석 솔루션 아키텍트입니다. 그는 금융 서비스 업계에서 근무하며 기업의 클라우드 도입을 지원합니다. 그는 서버리스 기술, 보안 및 규정 준수에 열정을 갖고 있습니다. 그는 가족, 친구들과 함께 시간을 보내고 다양한 요리의 새로운 요리를 맛보는 것을 즐깁니다.

spot_img

최신 인텔리전스

spot_img