Zephyrnet Logo

Amazon Managed Service for Apache Flink agora oferece suporte ao Apache Flink versão 1.18 | Amazon Web Services

Data:

Apache Flink é um mecanismo de processamento distribuído de código aberto, que oferece interfaces de programação poderosas para processamento em fluxo e em lote, com suporte de primeira classe para processamento com estado e semântica de tempo de evento. Apache Flink oferece suporte a várias linguagens de programação, Java, Python, Scala, SQL e várias APIs com diferentes níveis de abstração, que podem ser usadas de forma intercambiável no mesmo aplicativo.

Serviço gerenciado da Amazon para Apache Flink, que oferece uma experiência totalmente gerenciada e sem servidor na execução de aplicativos Apache Flink, agora oferece suporte Apache Flink 1.18.1, a versão mais recente do Apache Flink no momento em que este artigo foi escrito.

Nesta postagem, discutimos alguns dos novos recursos e capacidades interessantes do Apache Flink, introduzidos nas versões principais mais recentes, 1.16, 1.17 e 1.18, e agora com suporte no Managed Service para Apache Flink.

Novos conectores

Antes de nos aprofundarmos nas novas funcionalidades do Apache Flink disponíveis na versão 1.18.1, vamos explorar os novos recursos provenientes da disponibilidade de muitos novos conectores de código aberto.

Opensearch

Um dedicado Opensearch O conector agora está disponível para ser incluído em seus projetos, permitindo que um aplicativo Apache Flink grave dados diretamente no OpenSearch, sem depender do modo de compatibilidade do Elasticsearch. Este conector é compatível com Serviço Amazon OpenSearch provisionado e Serviço OpenSearch sem servidor.

Este novo conector suporta APIs SQL e de tabela, trabalhando com Java e Python, e o API DataStream, apenas para Java. Pronto para uso, ele fornece garantias de pelo menos uma vez, sincronizando as gravações com o checkpoint do Flink. Você pode obter semântica exatamente uma vez usando IDs determinísticos e método upsert.

Por padrão, o conector usa bibliotecas de cliente do OpenSearch versão 1.x. Você pode mudar para a versão 2.x adicionando as dependências corretas.

Amazon DynamoDB

Os desenvolvedores do Apache Flink agora podem usar um conector dedicado para gravar dados Amazon DynamoDB. Este conector é baseado no Apache Flink AsyncSink, desenvolvido pela AWS e agora parte integrante do projeto Apache Flink, para simplificar a implementação de conectores de coletor eficientes, usando solicitações de gravação sem bloqueio e lotes adaptáveis.

Este conector também suporta ambos SQL e Tabela APIs, Java e Python, e Fluxo de dados API, apenas para Java. Por padrão, o coletor grava em lotes para otimizar o rendimento. Um recurso notável da versão SQL é o suporte para a cláusula PARTITIONED BY. Ao especificar uma ou mais chaves, você pode obter alguma desduplicação do lado do cliente, enviando apenas o registro mais recente por chave em cada gravação em lote. Um equivalente pode ser alcançado com a API DataStream especificando uma lista de chaves de partição para substituição em cada lote.

Este conector funciona apenas como coletor. Você não pode usá-lo para leitura no DynamoDB. Para pesquisar dados no DynamoDB, você ainda precisa implementar uma pesquisa usando o API de E/S assíncrona Flink ou implementar uma função personalizada definida pelo usuário (UDF), para SQL.

MongoDB

Outro conector interessante é para MongoDB. Neste caso, tanto a fonte quanto o sumidouro estão disponíveis, tanto para o SQL e Tabela APIs e Fluxo de dados API. O novo conector agora faz parte oficialmente do projeto Apache Flink e é apoiado pela comunidade. Este novo conector substitui o antigo fornecido diretamente pelo MongoDB, que suporta apenas APIs Flink Sink e Source mais antigas.

Tal como acontece com outros conectores de armazenamento de dados, a origem pode ser usada como uma origem limitada, no modo em lote ou para pesquisas. O coletor funciona tanto em modo de lote quanto em streaming, suportando o modo de upsert e de acréscimo.

Entre os muitos recursos notáveis ​​deste conector, um que vale a pena mencionar é a capacidade de ativar o cache ao usar a fonte para pesquisas. Fora da caixa, a pia oferece garantias de pelo menos uma vez. Quando uma chave primária é definida, o coletor pode suportar semântica exatamente uma vez por meio de upserts idempotentes. O conector sink também oferece suporte à semântica exatamente uma vez, com upserts idempotentes, quando a chave primária é definida.

Novo versionamento de conector

Não é um recurso novo, mas um fator importante a considerar ao atualizar um aplicativo Apache Flink mais antigo, é o novo controle de versão do conector. A partir do Apache Flink versão 1.17, a maioria dos conectores foram externalizados da distribuição principal do Apache Flink e seguem versionamento independente.

Para incluir a dependência correta, você precisa especificar a versão do artefato com o formulário: <connector-version>-<flink-version>

Por exemplo, o conector Kafka mais recente, também funcionando com Amazon Managed Streaming para Apache Kafka (Amazon MSK), no momento em que este artigo foi escrito, era a versão 3.1.0. Se você estiver usando o Apache Flink 1.18, a dependência a ser usada será a seguinte:

<dependency> 
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId> 
    <version>3.1.0-1.18</version>
</dependency>

Escolha Amazon Kinesis, a nova versão do conector é 4.2.0. A dependência do Apache Flink 1.18 será a seguinte:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis</artifactId> 
    <version>4.2.0-1.18</version>
</dependency>

Nas seções a seguir, discutiremos mais sobre os novos recursos poderosos agora disponíveis no Apache Flink 1.18 e com suporte no Amazon Managed Service para Apache Flink.

SQL

No Apache Flink SQL, os usuários podem fornecer dicas para juntar consultas que podem ser usadas para sugerir que o otimizador tenha efeito no plano de consulta. Em particular, em aplicações de streaming, junções de pesquisa são usados ​​para enriquecer uma tabela, representando dados de streaming, com dados consultados de um sistema externo, normalmente um banco de dados. Desde a versão 1.16, diversas melhorias foram introduzidas nas junções de pesquisa, permitindo ajustar o comportamento da junção e melhorar o desempenho:

  • Cache de pesquisa é um recurso poderoso que permite armazenar em cache na memória os registros usados ​​com mais frequência, reduzindo a pressão no banco de dados. Anteriormente, o cache de pesquisa era específico para alguns conectores. Desde o Apache Flink 1.16, esta opção tornou-se disponível para todos os conectores que suportam pesquisa internamente (FLIP-221). No momento em que este livro foi escrito, JDBC, Colméia e HBase conectores suportam cache de pesquisa. O cache de pesquisa possui três modos disponíveis: FULL, para um pequeno conjunto de dados que pode ser mantido inteiramente na memória, PARTIAL, para um grande conjunto de dados, armazenando em cache apenas os registros mais recentes ou NONE, para desabilitar completamente o cache. Para PARTIAL cache, você também pode configurar o número de linhas para buffer e o tempo de vida.
  • Pesquisa assíncrona é outro recurso que pode melhorar muito o desempenho. A pesquisa assíncrona fornece no Apache Flink SQL uma funcionalidade semelhante a E/S assíncrona disponível na API DataStream. Ele permite que o Apache Flink emita novas solicitações ao banco de dados sem bloquear o thread de processamento até que as respostas às pesquisas anteriores sejam recebidas. Da mesma forma que a E/S assíncrona, você pode configurar a pesquisa assíncrona para impor a ordem ou permitir resultados não ordenados, ou ajustar a capacidade do buffer e o tempo limite.
  • Você também pode configurar um estratégia de repetição de pesquisa em combinação com PARTIAL or NONE cache de pesquisa, para configurar o comportamento em caso de falha na pesquisa no banco de dados externo.

Todos esses comportamentos podem ser controlados usando um LOOKUP dica, como no exemplo a seguir, onde mostramos uma junção de pesquisa usando pesquisa assíncrona:

SELECT 
    /*+ LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') */ 
    O.order_id, O.total, C.address
FROM Orders AS O 
JOIN Customers FOR SYSTEM_TIME AS OF O.proc_time AS C 
  ON O.customer_id = O.customer_id

PyFlink

Nesta seção, discutimos novas melhorias e suporte no PyFlink.

Suporte Python 3.10

As versões mais recentes do Apache Flink introduziram várias melhorias para usuários do PyFlink. Em primeiro lugar, o Python 3.10 agora é compatível e o suporte ao Python 3.6 foi completamente removido (FLINK-29421). O Managed Service for Apache Flink atualmente usa o tempo de execução Python 3.10 para executar aplicativos PyFlink.

Aproximando-se da paridade de recursos

Do ponto de vista da API de programação, o PyFlink está cada vez mais próximo do Java em todas as versões. A API DataStream agora oferece suporte a recursos como saídas secundárias e estado de transmissão, e as lacunas na API de janelas foram fechadas. PyFlink agora também oferece suporte a novos conectores como Fluxos de dados do Amazon Kinesis diretamente da API DataStream.

Melhorias no modo Thread

PyFlink é muito eficiente. A sobrecarga de execução dos operadores da API Flink no PyFlink é mínima em comparação com Java ou Scala, porque o tempo de execução na verdade executa a implementação do operador diretamente na JVM, independentemente da linguagem do seu aplicativo. Mas quando você tem uma função definida pelo usuário, as coisas são um pouco diferentes. Uma linha de código Python tão simples quanto lambda x: x + 1, ou tão complexa quanto uma função Pandas, deve ser executada em um tempo de execução Python.

Por padrão, o Apache Flink executa um tempo de execução Python em cada Gerenciador de Tarefas, externo à JVM. Cada registro é serializado, entregue ao tempo de execução do Python por meio de comunicação entre processos, desserializado e processado no tempo de execução do Python. O resultado é então serializado e devolvido à JVM, onde é desserializado. Este é o PyFlink Modo PROCESSO. É muito estável, mas introduz uma sobrecarga e, em alguns casos, pode se tornar um gargalo de desempenho.

Desde a versão 1.15, o Apache Flink também suporta Modo LINHA para PyFlink. Nesse modo, as funções Python definidas pelo usuário são executadas dentro da própria JVM, removendo a serialização/desserialização e a sobrecarga de comunicação entre processos. O modo THREAD tem algumas limitações; por exemplo, o modo THREAD não pode ser usado para Pandas ou UDAFs (funções agregadas definidas pelo usuário, consistindo em muitos registros de entrada e um registro de saída), mas pode melhorar substancialmente o desempenho de um aplicativo PyFlink.

Com a versão 1.16, o suporte ao modo THREAD foi substancialmente ampliado, abrangendo também a API Python DataStream.

O modo THREAD é suportado pelo Managed Service for Apache Flink e pode ser habilitado diretamente de seu aplicativo PyFlink.

Suporte Apple Silicon

Se você usa máquinas baseadas em Apple Silicon para desenvolver aplicativos PyFlink, desenvolvendo para PyFlink 1.15, provavelmente encontrou alguns dos problemas conhecidos de dependência de Python no Apple Silicon. Esses problemas foram finalmente resolvidos (FLINK-25188). Essas limitações não afetaram os aplicativos PyFlink em execução no Managed Service for Apache Flink. Antes da versão 1.16, se você quisesse desenvolver um aplicativo PyFlink em uma máquina usando chipset M1, M2 ou M3, você teria que usar alguns soluções alternativas, porque era impossível instalar o PyFlink 1.15 ou anterior diretamente na máquina.

Melhorias nos pontos de verificação desalinhados

Apache Flink 1.15 já suportava pontos de verificação incrementais e destruição de buffer. Esses recursos podem ser usados, especialmente em combinação, para melhorar o desempenho do ponto de verificação, tornando a duração do ponto de verificação mais previsível, especialmente na presença de contrapressão. Para obter mais informações sobre esses recursos, consulte Otimize o checkpoint em seu Amazon Managed Service para aplicações Apache Flink com destruição de buffer e pontos de verificação desalinhados.

Com as versões 1.16 e 1.17, diversas alterações foram introduzidas para melhorar a estabilidade e o desempenho.

Lidando com distorção de dados

Apache Flink usa marcas d’água para dar suporte à semântica de tempo de evento. Marcas d'água são registros especiais, normalmente injetados no fluxo do operador de origem, que marcam o progresso do horário do evento para operadores como agregações de janelas de horário do evento. Uma técnica comum é atrasar as marcas d'água do último horário do evento observado, para permitir que os eventos fiquem fora de ordem, pelo menos até certo ponto.

No entanto, o uso de marcas d'água apresenta um desafio. Quando a aplicação possui múltiplas fontes, por exemplo, recebe eventos de múltiplas partições de um tópico Kafka, marcas d'água são geradas independentemente para cada partição. Internamente, cada operador espera sempre pela mesma marca d’água em todas as partições de entrada, praticamente alinhando-a na partição mais lenta. A desvantagem é que se uma das partições não estiver recebendo dados, as marcas d'água não progridem, aumentando a latência ponta a ponta. Por esta razão, um tempo limite de inatividade opcional foi introduzido em muitas fontes de streaming. Após o tempo limite configurado, a geração da marca d'água ignora qualquer partição que não receba nenhum registro e as marcas d'água podem progredir.

Você também pode enfrentar um desafio semelhante, mas oposto, se uma fonte estiver recebendo eventos muito mais rápido que as outras. As marcas d'água são alinhadas à partição mais lenta, o que significa que qualquer agregação de janelas aguardará pela marca d'água. Os registros da fonte rápida precisam esperar, sendo armazenados em buffer. Isso pode resultar no armazenamento em buffer de um volume excessivo de dados e em um crescimento incontrolável do estado do operador.

Para resolver o problema de fontes mais rápidas, começando com Apache Flink 1.17, você pode habilitar o alinhamento de marca d'água de divisões de origem (FLINK-28853). Este mecanismo, desabilitado por padrão, garante que nenhuma partição progrida suas marcas d’água muito rápido, em comparação com outras partições. Você pode vincular várias fontes, como vários tópicos de entrada, atribuindo o mesmo ID de grupo de alinhamento e configurando a duração do desvio máximo da marca d’água atual. Se uma partição específica estiver recebendo eventos muito rápido, o operador de origem fará uma pausa no consumo dessa partição até que o desvio seja reduzido abaixo do limite configurado.

Você pode habilitá-lo para cada fonte separadamente. Tudo o que você precisa é especificar um ID de grupo de alinhamento, que unirá todas as fontes que possuem o mesmo ID e a duração do desvio máximo da marca d'água mínima atual. Isso pausará o consumo da subtarefa de origem que está avançando muito rápido, até que o desvio seja inferior ao limite especificado.

O trecho de código a seguir mostra como você pode configurar o alinhamento da marca d'água de divisões de origem em uma fonte Kafka que emite marcas d'água fora de ordem limitadas:

KafkaSource<Event> kafkaSource = ...
DataStream<Event> stream = env.fromSource(
    kafkaSource,
    WatermarkStrategy.<Event>forBoundedOutOfOrderness( Duration.ofSeconds(20))
        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)),
    "Kafka source"));

Este recurso está disponível apenas com FLIP-217 fontes compatíveis, suportando alinhamento de marca d'água de divisões de fonte. No momento da escrita, entre os principais conectores de fontes de streaming, apenas a fonte Kafka oferece suporte a esse recurso.

Suporte direto para formato Protobuf

As APIs SQL e Table agora oferecem suporte direto Formato protobuf. Para usar este formato, você precisa gerar as classes Java do Protobuf a partir do .proto arquivos de definição de esquema e inclua-os como dependências em seu aplicativo.

O formato Protobuf funciona apenas com as APIs SQL e Table e apenas para ler ou gravar dados serializados por Protobuf de uma origem ou para um coletor. Atualmente, o Flink não oferece suporte direto ao Protobuf para serializar o estado diretamente e não oferece suporte à evolução do esquema, como faz para Avro, por exemplo. Você ainda precisa registrar um serializador personalizado com alguma sobrecarga para seu aplicativo.

Mantendo o código aberto do Apache Flink

O Apache Flink depende internamente do Akka para enviar dados entre subtarefas. Em 2022, Lightbend, a empresa por trás da Akka, anunciou uma mudança de licença para futuras versões do Akka, do Apache 2.0 a uma licença mais restritiva, e que o Akka 2.6, a versão usada pelo Apache Flink, não receberia nenhuma atualização ou correção de segurança adicional.

Embora o Akka tenha sido historicamente muito estável e não exija atualizações frequentes, esta mudança de licença representou um risco para o projeto Apache Flink. A decisão da comunidade Apache Flink foi substituir o Akka por um fork da versão 2.6, chamado Apache Pekko (FLINK-32468). Este fork manterá a licença Apache 2.0 e receberá todas as atualizações exigidas pela comunidade. Enquanto isso, a comunidade Apache Flink considerará se removerá completamente a dependência de Akka ou Pekko.

Compressão de estado

Apache Flink oferece compactação opcional (padrão: desativado) para todos os pontos de verificação e pontos de salvamento. Apache Flink identificou um bug no Flink 1.18.1, onde o estado do operador não pôde ser restaurado corretamente quando a compactação de instantâneo está habilitada. Isso pode resultar em perda de dados ou incapacidade de restauração do ponto de verificação. Para resolver isso, o Managed Service for Apache Flink fez backport do fixo que será incluído em versões futuras do Apache Flink.

Atualizações de versão no local com serviço gerenciado para Apache Flink

Se você estiver executando um aplicativo no Managed Service for Apache Flink usando o Apache Flink 1.15 ou anterior, agora poderá atualizá-lo no local para 1.18 sem perder o estado, usando o comando Interface de linha de comando da AWS (AWSCLI), Formação da Nuvem AWS or Kit de desenvolvimento em nuvem da AWS (AWS CDK) ou qualquer ferramenta que use a API da AWS.

A Atualizar aplicativo A ação da API agora oferece suporte à atualização da versão do tempo de execução do Apache Flink de um aplicativo existente de serviço gerenciado para Apache Flink. Você pode usar UpdateApplication diretamente em um aplicativo em execução.

Antes de prosseguir com a atualização in-loco, você precisa verificar e atualizar as dependências incluídas em sua aplicação, certificando-se de que sejam compatíveis com a nova versão do Apache Flink. Em particular, você precisa atualizar qualquer biblioteca, conectores e possivelmente versão do Scala do Apache Flink.

Além disso, recomendamos testar o aplicativo atualizado antes de prosseguir com a atualização. Recomendamos testar localmente e em um ambiente que não seja de produção, usando a versão de runtime de destino do Apache Flink, para garantir que nenhuma regressão foi introduzida.

E, finalmente, se o seu aplicativo tiver estado, recomendamos fazer uma instantâneo do estado do aplicativo em execução. Isso permitirá que você reverta para a versão anterior do aplicativo.

Quando estiver pronto, agora você pode usar o Atualizar aplicativo Ação da API ou aplicativo de atualização Comando da AWS CLI para atualizar a versão de tempo de execução do aplicativo e apontá-la para o novo artefato do aplicativo, JAR ou arquivo zip, com as dependências atualizadas.

Para obter informações mais detalhadas sobre o processo e a API, consulte Atualização de versão local para Apache Flink. A documentação inclui instruções passo a passo e um vídeo para guiá-lo durante o processo de atualização.

Conclusões

Nesta postagem, examinamos alguns dos novos recursos do Apache Flink, com suporte no Amazon Managed Service para Apache Flink. Esta lista não é compreensiva. O Apache Flink também introduziu alguns recursos muito promissores, como TTL em nível de operador para SQL e API de tabela [FLIP-292] e Viagem no Tempo [FLIP-308], mas ainda não são suportados pela API e ainda não estão realmente acessíveis aos usuários. Por esse motivo, decidimos não abordá-los neste post.

Com o suporte do Apache Flink 1.18, o Managed Service for Apache Flink agora oferece suporte à versão mais recente lançada do Apache Flink. Vimos alguns dos novos recursos interessantes e novos conectores disponíveis com o Apache Flink 1.18 e como o serviço gerenciado para Apache Flink ajuda a atualizar um aplicativo existente.

Você pode encontrar mais detalhes sobre lançamentos recentes no blog Apache Flink e nas notas de lançamento:

Se você é novo no Apache Flink, recomendamos nosso guia para escolher a API e a linguagem corretas e seguindo o Guia de Introdução para começar a usar o serviço gerenciado para Apache Flink.


Sobre os autores

Lorenzo NicoraLorenzo Nicora trabalha como arquiteto sênior de soluções de streaming na AWS, ajudando clientes em toda a EMEA. Ele vem construindo sistemas nativos da nuvem e com uso intensivo de dados há mais de 25 anos, trabalhando no setor financeiro por meio de consultorias e para empresas de produtos FinTech. Ele aproveitou extensivamente tecnologias de código aberto e contribuiu para vários projetos, incluindo Apache Flink.

Francisco MorilloFrancisco Morillo é arquiteto de soluções de streaming na AWS. Francisco trabalha com clientes da AWS, ajudando-os a projetar arquiteturas de análise em tempo real usando serviços da AWS, oferecendo suporte ao Amazon MSK e ao Amazon Managed Service para Apache Flink.

local_img

Inteligência mais recente

local_img