Zephyrnet Logo

Apresentando o suporte do Amazon MWAA para Apache Airflow versão 2.8.1 | Amazon Web Services

Data:

Fluxos de trabalho gerenciados da Amazon para Apache Airflow (Amazon MWAA) é um serviço de orquestração gerenciado para Fluxo de ar Apache isso facilita a configuração e a operação de pipelines de dados ponta a ponta na nuvem.

As organizações usam o Amazon MWAA para aprimorar seus fluxos de trabalho de negócios. Por exemplo, Genômica C2i usa o Amazon MWAA em sua plataforma de dados para orquestrar a validação de algoritmos que processam dados genômicos do câncer em bilhões de registros. Twitch, uma plataforma de transmissão ao vivo, gerencia e orquestra o treinamento e a implantação de seus modelos de recomendação para mais de 140 milhões de usuários ativos. Eles usam o Amazon MWAA para escalar, melhorando significativamente a segurança e reduzindo a sobrecarga de gerenciamento de infraestrutura.

Hoje, estamos anunciando a disponibilidade dos ambientes Apache Airflow versão 2.8.1 no Amazon MWAA. Nesta postagem, mostramos alguns dos novos recursos e capacidades do Airflow agora disponíveis no Amazon MWAA e como você pode configurar ou atualizar seu ambiente Amazon MWAA para a versão 2.8.1.

Armazenamento de objetos

À medida que os pipelines de dados aumentam, os engenheiros lutam para gerenciar o armazenamento em vários sistemas com APIs, métodos de autenticação e convenções exclusivos para acessar dados, exigindo lógica personalizada e operadores específicos de armazenamento. O Airflow agora oferece uma camada de abstração de armazenamento de objetos unificada que lida com esses detalhes, permitindo que os engenheiros se concentrem em seus pipelines de dados. Usos de armazenamento de objetos do Airflow fsspec para permitir código de acesso a dados consistente em diferentes sistemas de armazenamento de objetos, simplificando assim a complexidade da infraestrutura.

A seguir estão alguns dos principais benefícios do recurso:

  • Fluxos de trabalho portáteis – Você pode alternar serviços de armazenamento com alterações mínimas em seus gráficos acíclicos direcionados (DAGs)
  • Transferências de dados eficientes – Você pode transmitir dados em vez de carregá-los na memória
  • Manutenção reduzida – Você não precisa de operadores separados, o que facilita a manutenção dos seus pipelines
  • Experiência de programação familiar – Você pode usar módulos Python, como shutil, para operações de arquivo

Para usar o armazenamento de objetos com Serviço de armazenamento simples da Amazon (Amazon S3), você precisa instale o pacote extra s3fs com o provedor Amazon (apache-airflow-providers-amazon[s3fs]==x.x.x).

No exemplo de código abaixo, você pode ver como mover dados diretamente do Google Cloud Storage para Amazon S3. Como o armazenamento de objetos do Airflow usa shutil.copyfileobj, os dados dos objetos são lidos em pedaços de gcs_data_source e transmitido para amazon_s3_data_target.

gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")

amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")

with DAG(
    dag_id="copy_from_gcs_to_amazon_s3",
    start_date=datetime(2024, 2, 26),
    schedule="0 0 * * *",
    catchup=False,    
    tags=["2.8", "ObjectStorage"],
) as dag:

    def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:
        objects = [f for f in path.iterdir() if f.is_file()]
        return objects

    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):    
        object.copy(dst=path)

    objects_list = list_objects(path=gcs_data_source)
    copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)

Para obter mais informações sobre o armazenamento de objetos do Airflow, consulte Armazenamento de objetos.

UI XCom

XCom (comunicação cruzada) permite a passagem de dados entre tarefas, facilitando a comunicação e coordenação entre elas. Anteriormente, os desenvolvedores tinham que mudar para uma visão diferente para ver os XComs relacionados a uma tarefa. Com o Airflow 2.8, os valores-chave do XCom são renderizados diretamente em uma guia na visualização Airflow Grid, conforme mostrado na captura de tela a seguir.

O novo xcom fornece os seguintes benefícios:

  • Visibilidade XCom aprimorada – Uma guia dedicada na interface do usuário fornece uma maneira conveniente e fácil de ver todos os XComs associados a um DAG ou tarefa.
  • Depuração aprimorada – Ser capaz de ver os valores XCom diretamente na IU é útil para depurar DAGs. Você pode ver rapidamente a saída das tarefas upstream sem precisar extraí-las e inspecioná-las manualmente usando o código Python.

Registrador de contexto de tarefa

O gerenciamento dos ciclos de vida das tarefas é crucial para o bom funcionamento dos pipelines de dados no Airflow. No entanto, certos desafios persistiram, especialmente em cenários em que as tarefas são interrompidas inesperadamente. Isso pode ocorrer por vários motivos, incluindo tempos limite do agendador, zumbi tarefas (tarefas que permanecem em estado de execução sem enviar pulsações) ou instâncias em que o trabalhador fica sem memória.

Tradicionalmente, essas falhas, especialmente aquelas acionadas pelos principais componentes do Airflow, como o agendador ou o executor, não eram registradas nos logs de tarefas. Essa limitação exigia que os usuários solucionassem problemas fora da IU do Airflow, complicando o processo de identificação e resolução de problemas.

O Airflow 2.8 introduziu uma melhoria significativa que resolve esse problema. Os componentes do Airflow, incluindo o agendador e o executor, agora podem usar o novo TaskContextLogger para encaminhar mensagens de erro diretamente para os logs de tarefas. Este recurso permite que você veja todas as mensagens de erro relevantes relacionadas à execução de uma tarefa em um só lugar. Isso simplifica o processo de descobrir por que uma tarefa falhou, oferecendo uma perspectiva completa do que deu errado em uma única visualização de log.

A captura de tela a seguir mostra como a tarefa é detectada como zombiee o log do planejador está sendo incluído como parte do log de tarefas.

Você precisa definir o parâmetro de configuração do ambiente enable_task_context_logger para True, para ativar o recurso. Depois de ativado, o Airflow pode enviar logs do agendador, do executor ou do contexto de execução de retorno de chamada para os logs de tarefas e disponibilizá-los na IU do Airflow.

Ganchos de ouvinte para conjuntos de dados

Conjuntos de dados foram introduzidos no Airflow 2.4 como um agrupamento lógico de fontes de dados para criar agendamento com reconhecimento de dados e dependências entre DAGs. Por exemplo, você pode agendar um DAG de consumidor para ser executado quando um DAG de produtor atualizar um conjunto de dados. Ouvintes permitir que os usuários do Airflow criem assinaturas para determinados eventos que acontecem no ambiente. No Airflow 2.8, os listeners são adicionados para dois eventos de conjuntos de dados: on_dataset_created e on_dataset_changed, permitindo efetivamente que os usuários do Airflow escrevam códigos personalizados para reagir às operações de gerenciamento de conjuntos de dados. Por exemplo, você pode acionar um sistema externo ou enviar uma notificação.

Usar ganchos de ouvinte para conjuntos de dados é simples. Conclua as etapas a seguir para criar um ouvinte para on_dataset_changed:

  1. Crie o ouvinte (dataset_listener.py):
    from airflow import Dataset
    from airflow.listeners import hookimpl
    
    @hookimpl
    def on_dataset_changed(dataset: Dataset):
        """Following custom code is executed when a dataset is changed."""
        print("Invoking external endpoint")
    
        """Validating a specific dataset"""
        if dataset.uri == "s3://bucket-prefix/object-key.ext":
            print ("Execute specific/different action for this dataset")

  2. Crie um plug-in para registrar o ouvinte em seu ambiente Airflow (dataset_listener_plugin.py):
    from airflow.plugins_manager import AirflowPlugin
    from plugins import listener_code
    
    class DatasetListenerPlugin(AirflowPlugin):
        name = "dataset_listener_plugin"
        listeners = [dataset_listener]

Para obter mais informações sobre como instalar plug-ins no Amazon MWAA, consulte Instalando plug-ins personalizados.

Configure um novo ambiente do Airflow 2.8.1 no Amazon MWAA

Você pode iniciar o instalação em sua conta e região preferida usando o Console de gerenciamento da AWS, API ou Interface de linha de comando da AWS (AWS CLI). Se você estiver adotando infraestrutura como código (IaC), poderá automatizar a configuração usando Formação da Nuvem AWS, Kit de desenvolvimento em nuvem da AWS (AWS CDK) ou scripts Terraform.

Após a criação bem-sucedida de um ambiente Airflow versão 2.8.1 no Amazon MWAA, determinados pacotes são instalados automaticamente no agendador e nos nós de trabalho. Para obter uma lista completa de pacotes instalados e suas versões, consulte Pacotes do provedor Apache Airflow instalados em ambientes Amazon MWAA. Você pode instalar pacotes adicionais usando um arquivo de requisitos.

Atualize de versões mais antigas do Airflow para a versão 2.8.1

Você pode aproveitar esses recursos mais recentes atualizando seus ambientes antigos baseados na versão 2.x do Airflow para a versão 2.8.1 usando atualizações de versão locais. Para saber mais sobre atualizações de versão no local, consulte Atualizando a versão do Apache Airflow or Apresentando atualizações de versão no local com Amazon MWAA.

Conclusão

Nesta postagem, discutimos alguns recursos importantes introduzidos no Airflow versão 2.8, como armazenamento de objetos, a nova guia XCom adicionada à visualização em grade, registro de contexto de tarefa, ganchos de ouvinte para conjuntos de dados e como você pode começar a usá-los. Também fornecemos alguns exemplos de código para mostrar implementações no Amazon MWAA. Para a lista completa de alterações, consulte Notas de lançamento do Airflow.

Para obter detalhes adicionais e exemplos de código no Amazon MWAA, visite o Guia do usuário do Amazon MWAA e os votos de Repo do GitHub de exemplos do Amazon MWAA.

Apache, Apache Airflow e Airflow são marcas registradas ou marcas comerciais da Fundação Apache Software nos Estados Unidos e / ou outros países.


Sobre os autores

Mansi Butada é um arquiteto de soluções ISV baseado na Holanda. Ela ajuda os clientes a projetar e implementar soluções bem arquitetadas na AWS que resolvem seus problemas de negócios. Ela é apaixonada por análise de dados e networking. Além do trabalho, ela gosta de experimentar comida, jogar pickleball e mergulhar em divertidos jogos de tabuleiro.

Hernán Garcia é arquiteto de soluções sênior na AWS e baseado na Holanda. Ele trabalha no setor de serviços financeiros, apoiando empresas na adoção da nuvem. Ele é apaixonado por tecnologias sem servidor, segurança e conformidade. Ele gosta de passar tempo com a família e amigos e experimentar novos pratos de diferentes cozinhas.

local_img

Inteligência mais recente

local_img