Zephyrnet-logo

Amazon Managed Service voor Apache Flink ondersteunt nu Apache Flink versie 1.18 | Amazon-webservices

Datum:

Apache Flink is een open source gedistribueerde verwerkingsengine die krachtige programmeerinterfaces biedt voor zowel stream- als batchverwerking, met eersteklas ondersteuning voor stateful verwerking en gebeurtenistijdsemantiek. Apache Flink ondersteunt meerdere programmeertalen, Java, Python, Scala, SQL en meerdere API's met verschillende abstractieniveaus, die door elkaar kunnen worden gebruikt in dezelfde applicatie.

Amazon Managed Service voor Apache Flink, dat een volledig beheerde, serverloze ervaring biedt bij het uitvoeren van Apache Flink-applicaties, ondersteunt nu Apache Flink 1.18.1, de nieuwste versie van Apache Flink op het moment van schrijven.

In dit bericht bespreken we enkele van de interessante nieuwe functies en mogelijkheden van Apache Flink, geïntroduceerd in de meest recente grote releases, 1.16, 1.17 en 1.18, en nu ondersteund in Managed Service voor Apache Flink.

Nieuwe connectoren

Voordat we ingaan op de nieuwe functionaliteiten van Apache Flink die beschikbaar zijn in versie 1.18.1, verkennen we eerst de nieuwe mogelijkheden die voortkomen uit de beschikbaarheid van veel nieuwe open source connectors.

OpenSearch

Een toegewijde OpenSearch connector is nu beschikbaar om in uw projecten te worden opgenomen, waardoor een Apache Flink-applicatie gegevens rechtstreeks naar OpenSearch kan schrijven, zonder afhankelijk te zijn van de Elasticsearch-compatibiliteitsmodus. Deze aansluiting is compatibel met Amazon OpenSearch-service bevoorraad en OpenSearch-service serverloos.

Deze nieuwe connector ondersteunt SQL- en tabel-API's, werkend met zowel Java als Python, en de DataStream-API, alleen voor Java. Out-of-the-box biedt het minimaal één keer garantie, waarbij de schrijfbewerkingen worden gesynchroniseerd met Flink-controlepunten. U kunt exact-eens-semantiek bereiken met behulp van deterministische ID's en de upsert-methode.

Standaard gebruikt de connector clientbibliotheken van OpenSearch versie 1.x. U kunt overstappen naar versie 2.x door het toevoegen van de juiste afhankelijkheden.

Amazon DynamoDB

Apache Flink-ontwikkelaars kunnen nu een speciale connector gebruiken om gegevens naar te schrijven Amazon DynamoDB. Deze connector is gebaseerd op de Apache Flink AsyncSink, ontwikkeld door AWS en nu een integraal onderdeel van het Apache Flink-project, om de implementatie van efficiënte sink-connectors te vereenvoudigen, met behulp van niet-blokkerende schrijfverzoeken en adaptieve batching.

Deze connector ondersteunt ook beide SQL en Tabel API's, Java en Python, en Data stroom API, alleen voor Java. Standaard schrijft de sink in batches om de doorvoer te optimaliseren. Een opvallend kenmerk van de SQL-versie is ondersteuning voor de PARTITIONED BY-clausule. Door een of meer sleutels op te geven, kunt u ontdubbeling aan de clientzijde realiseren, waarbij bij elke batchschrijfbewerking alleen de laatste record per sleutel wordt verzonden. Een equivalent kan worden bereikt met de DataStream API door een lijst met partitiesleutels op te geven die binnen elke batch moeten worden overschreven.

Deze aansluiting werkt alleen als gootsteen. U kunt het niet gebruiken voor het lezen van DynamoDB. Om gegevens in DynamoDB op te zoeken, moet u nog steeds een zoekopdracht implementeren met behulp van de Flink Asynchrone I/O-API of het implementeren van een aangepaste, door de gebruiker gedefinieerde functie (UDF) voor SQL.

MongoDB

Een andere interessante connector is voor MongoDB. In dit geval zijn zowel source als sink beschikbaar, voor zowel de SQL en Tabel API's en Data stroom API. De nieuwe connector maakt nu officieel deel uit van het Apache Flink-project en wordt ondersteund door de community. Deze nieuwe connector vervangt de oude die rechtstreeks door MongoDB wordt geleverd en ondersteunt alleen oudere Flink Sink- en Source-API's.

Net als bij andere connectoren voor gegevensopslag kan de bron worden gebruikt als een begrensde bron, in batchmodus of voor zoekopdrachten. De sink werkt zowel in batchmodus als in streaming en ondersteunt zowel upsert- als append-modus.

Een van de vele opvallende kenmerken van deze connector is de mogelijkheid om caching in te schakelen wanneer de bron wordt gebruikt voor zoekopdrachten. Out-of-the-box ondersteunt de gootsteen minstens één keer garanties. Wanneer een primaire sleutel is gedefinieerd, kan de sink semantiek precies één keer ondersteunen via idempotente upserts. De sink-connector ondersteunt ook precies één keer semantiek, met idempotente upserts, wanneer de primaire sleutel is gedefinieerd.

Nieuw connectorversiebeheer

Geen nieuwe functie, maar een belangrijke factor waarmee u rekening moet houden bij het updaten van een oudere Apache Flink-applicatie, is het nieuwe versiebeheer van de connector. Vanaf Apache Flink versie 1.17 zijn de meeste connectoren geëxternaliseerd vanuit de hoofddistributie van Apache Flink en volgen ze onafhankelijk versiebeheer.

Om de juiste afhankelijkheid op te nemen, moet u de artefactversie opgeven met het formulier: <connector-version>-<flink-version>

Bijvoorbeeld de nieuwste Kafka-connector, waar ook mee gewerkt wordt Amazon Managed Streaming voor Apache Kafka (Amazon MSK), op het moment van schrijven is versie 3.1.0. Als u Apache Flink 1.18 gebruikt, is de te gebruiken afhankelijkheid de volgende:

<dependency> 
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId> 
    <version>3.1.0-1.18</version>
</dependency>

Voor Amazon Kinesis, de nieuwe connectorversie is 4.2.0. De afhankelijkheid voor Apache Flink 1.18 is als volgt:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis</artifactId> 
    <version>4.2.0-1.18</version>
</dependency>

In de volgende secties bespreken we meer van de krachtige nieuwe functies die nu beschikbaar zijn in Apache Flink 1.18 en worden ondersteund in Amazon Managed Service voor Apache Flink.

SQL

In Apache Flink SQL kunnen gebruikers opgeven hints om query's samen te voegen die kunnen worden gebruikt om te suggereren dat de optimalisatie een effect heeft in het queryplan. Met name bij streamingtoepassingen opzoeken voegt zich bij worden gebruikt om een ​​tabel, die streaminggegevens vertegenwoordigt, te verrijken met gegevens die worden opgevraagd vanuit een extern systeem, meestal een database. Sinds versie 1.16 zijn er verschillende verbeteringen geïntroduceerd voor lookup-joins, waardoor u het gedrag van de join kunt aanpassen en de prestaties kunt verbeteren:

  • Cache opzoeken is een krachtige functie waarmee u de meest gebruikte records in het geheugen kunt opslaan, waardoor de druk op de database wordt verminderd. Voorheen was de opzoekcache specifiek voor bepaalde connectoren. Sinds Apache Flink 1.16 is deze optie beschikbaar voor alle connectoren die lookup (FLIP-221). Op het moment van schrijven is JDBC, Bijenkorf en HBase connectoren ondersteunen opzoekcache. Opzoekcache heeft drie beschikbare modi: FULL, voor een kleine dataset die volledig in het geheugen kan worden bewaard, PARTIAL, voor een grote dataset, alleen de meest recente records in de cache opslaan, of NONE, om de cache volledig uit te schakelen. Voor PARTIAL cache, kunt u ook het aantal rijen dat u wilt bufferen en de time-to-live configureren.
  • Asynchrone zoekopdracht is een andere functie die de prestaties aanzienlijk kan verbeteren. Async lookup biedt in Apache Flink SQL een functionaliteit die vergelijkbaar is met Asynchrone I/O beschikbaar in de DataStream-API. Hiermee kan Apache Flink nieuwe verzoeken naar de database sturen zonder de verwerkingsthread te blokkeren totdat antwoorden op eerdere zoekopdrachten zijn ontvangen. Net als bij Async I/O kunt u async lookup configureren om volgorde af te dwingen of ongeordende resultaten toe te staan, of de buffercapaciteit en de time-out aan te passen.
  • U kunt ook een configureren strategie voor opnieuw opzoeken in combinatie met PARTIAL or NONE lookup cache, om het gedrag te configureren in geval van een mislukte lookup in de externe database.

Al deze gedragingen kunnen worden gecontroleerd met behulp van een LOOKUP hint, zoals in het volgende voorbeeld, waar we een lookup-join weergeven met behulp van async lookup:

SELECT 
    /*+ LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') */ 
    O.order_id, O.total, C.address
FROM Orders AS O 
JOIN Customers FOR SYSTEM_TIME AS OF O.proc_time AS C 
  ON O.customer_id = O.customer_id

PyFlink

In deze sectie bespreken we nieuwe verbeteringen en ondersteuning in PyFlink.

Python 3.10 ondersteuning

De nieuwste versies van Apache Flink introduceerden verschillende verbeteringen voor PyFlink-gebruikers. Eerst en vooral wordt Python 3.10 nu ondersteund en is de ondersteuning voor Python 3.6 volledig verwijderd (FLINK-29421). Managed Service voor Apache Flink gebruikt momenteel Python 3.10 runtime om PyFlink-applicaties uit te voeren.

Dichter bij featurepariteit komen

Vanuit het perspectief van de programmeer-API komt PyFlink bij elke versie dichter bij Java. De DataStream API ondersteunt nu functies zoals zij-uitvoer en uitzendstatus, en hiaten in de venster-API zijn gedicht. PyFlink ondersteunt nu ook nieuwe connectoren zoals Amazon Kinesis-gegevensstromen rechtstreeks vanuit de DataStream API.

Verbeteringen in de threadmodus

PyFlink is zeer efficiënt. De overhead van het uitvoeren van Flink API-operators in PyFlink is minimaal vergeleken met Java of Scala, omdat de runtime feitelijk de operatorimplementatie in de JVM rechtstreeks uitvoert, ongeacht de taal van uw applicatie. Maar als je een door de gebruiker gedefinieerde functie hebt, liggen de zaken enigszins anders. Een regel Python-code zo simpel als lambda x: x + 1, of zo complex als een Pandas-functie, moet worden uitgevoerd in een Python-runtime.

Standaard voert Apache Flink een Python-runtime uit op elke Taakmanager, extern aan de JVM. Elke record wordt geserialiseerd, doorgegeven aan de Python-runtime via communicatie tussen processen, gedeserialiseerd en verwerkt in de Python-runtime. Het resultaat wordt vervolgens geserialiseerd en teruggestuurd naar de JVM, waar het wordt gedeserialiseerd. Dit is de PyFlink PROCES-modus. Het is erg stabiel, maar het brengt overhead met zich mee, en in sommige gevallen kan het een prestatieknelpunt worden.

Sinds versie 1.15 ondersteunt Apache Flink ook DRAAD-modus voor PyFlink. In deze modus worden door de gebruiker gedefinieerde Python-functies binnen de JVM zelf uitgevoerd, waardoor de overhead van serialisatie/deserialisatie en communicatie tussen processen wordt geëlimineerd. THREAD-modus heeft enkele beperkingen; De THREAD-modus kan bijvoorbeeld niet worden gebruikt voor Panda's of UDAF's (door de gebruiker gedefinieerde aggregatiefuncties, bestaande uit veel invoerrecords en één uitvoerrecord), maar kan de prestaties van een PyFlink-toepassing aanzienlijk verbeteren.

Met versie 1.16 is de ondersteuning van de THREAD-modus aanzienlijk uitgebreid, waarbij ook de Python DataStream API wordt gedekt.

THREAD-modus wordt ondersteund door Managed Service voor Apache Flink en kan dat ook zijn direct ingeschakeld vanuit uw PyFlink-applicatie.

Apple Silicon-ondersteuning

Als je op Apple Silicon gebaseerde machines gebruikt om PyFlink-applicaties te ontwikkelen, die voor PyFlink 1.15 ontwikkelen, ben je waarschijnlijk enkele van de bekende Python-afhankelijkheidsproblemen op Apple Silicon tegengekomen. Deze problemen zijn eindelijk opgelost (FLINK-25188). Deze beperkingen hadden geen invloed op PyFlink-applicaties die op Managed Service voor Apache Flink draaiden. Als je vóór versie 1.16 een PyFlink-applicatie wilde ontwikkelen op een machine met een M1-, M2- of M3-chipset, moest je een aantal workarounds, omdat het onmogelijk was om PyFlink 1.15 of eerder rechtstreeks op de machine te installeren.

Niet-uitgelijnde controlepuntverbeteringen

Apache Flink 1.15 ondersteunde al incrementele controlepunten en bufferdebloating. Deze functies kunnen, vooral in combinatie, worden gebruikt om de prestaties van controlepunten te verbeteren, waardoor de duur van controlepunten voorspelbaarder wordt, vooral als er sprake is van tegendruk. Voor meer informatie over deze functies, zie Optimaliseer controlepunten in uw Amazon Managed Service voor Apache Flink-applicaties met bufferdebloating en niet-uitgelijnde controlepunten.

Met versies 1.16 en 1.17 zijn verschillende wijzigingen geïntroduceerd om de stabiliteit en prestaties te verbeteren.

Omgaan met gegevensscheefheid

Apache Flink gebruikt watermerken om de semantiek van gebeurtenissen te ondersteunen. Watermerken zijn speciale records, die normaal gesproken in de stroom van de bronoperator worden geïnjecteerd, en die de voortgang van de gebeurtenistijd markeren voor operators, zoals aggregaties van gebeurtenistijdvensters. Een veelgebruikte techniek is het uitstellen van watermerken vanaf het laatst waargenomen tijdstip van de gebeurtenis, om ervoor te zorgen dat gebeurtenissen, althans tot op zekere hoogte, niet in de juiste volgorde staan.

Het gebruik van watermerken brengt echter een uitdaging met zich mee. Wanneer de toepassing meerdere bronnen heeft, bijvoorbeeld gebeurtenissen ontvangt van meerdere partities van een Kafka-onderwerp, worden voor elke partitie afzonderlijk watermerken gegenereerd. Intern wacht elke operator altijd op hetzelfde watermerk op alle invoerpartities, waardoor dit praktisch op de langzaamste partitie wordt uitgelijnd. Het nadeel is dat als een van de partities geen gegevens ontvangt, de watermerken niet vooruitgaan, waardoor de end-to-end latentie toeneemt. Om deze reden is een optionele time-out voor inactiviteit is geïntroduceerd in veel streamingbronnen. Na de geconfigureerde time-out negeert het genereren van watermerken elke partitie die geen record ontvangt, en kunnen watermerken doorgaan.

Je kunt ook met een soortgelijke, maar tegengestelde uitdaging te maken krijgen als de ene bron gebeurtenissen veel sneller ontvangt dan de andere. Watermerken worden uitgelijnd op de langzaamste partitie, wat betekent dat elke vensteraggregatie op het watermerk wacht. Opnames van de snelle bron moeten wachten en worden gebufferd. Dit kan resulteren in het bufferen van een buitensporig gegevensvolume en een oncontroleerbare groei van de operatorstatus.

Om het probleem van snellere bronnen aan te pakken, kunt u, te beginnen met Apache Flink 1.17, de uitlijning van de bronsplitsingen met watermerken inschakelen (FLINK-28853). Dit mechanisme, standaard uitgeschakeld, zorgt ervoor dat geen enkele partitie zijn watermerken te snel voortzet, vergeleken met andere partities. U kunt meerdere bronnen samenbinden, zoals meerdere invoeronderwerpen, dezelfde uitlijningsgroep-ID toewijzen en de duur van de maximale afwijking van het huidige watermerk configureren. Als een specifieke partitie te snel gebeurtenissen ontvangt, pauzeert de bronoperator het gebruik van die partitie totdat de drift tot onder de geconfigureerde drempelwaarde is teruggebracht.

U kunt dit voor elke bron afzonderlijk inschakelen. Het enige wat u nodig heeft, is een uitlijningsgroep-ID opgeven, die alle bronnen met dezelfde ID samenbindt, en de duur van de maximale afwijking ten opzichte van het huidige minimale watermerk. Hierdoor wordt het verbruik van de bronsubtaak gepauzeerd die te snel vordert, totdat de drift lager is dan de opgegeven drempelwaarde.

Het volgende codefragment laat zien hoe u watermerkuitlijning van bronsplitsingen kunt instellen op een Kafka-bron die watermerken met begrensde niet-volgorde uitzendt:

KafkaSource<Event> kafkaSource = ...
DataStream<Event> stream = env.fromSource(
    kafkaSource,
    WatermarkStrategy.<Event>forBoundedOutOfOrderness( Duration.ofSeconds(20))
        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)),
    "Kafka source"));

Deze functie is alleen beschikbaar bij FLIP-217 compatibele bronnen, ondersteunt watermerkuitlijning van bronsplitsingen. Op het moment van schrijven ondersteunt alleen de Kafka-bron deze functie van de belangrijkste streamingbronconnectors.

Directe ondersteuning voor Protobuf-formaat

De SQL- en Table-API's ondersteunen nu rechtstreeks Protobuf-formaat. Om dit formaat te gebruiken, moet u de Protobuf Java-klassen genereren uit de .proto schemadefinitiebestanden en neem deze op als afhankelijkheden in uw toepassing.

Het Protobuf-formaat werkt alleen met de SQL- en Table-API's en alleen voor het lezen of schrijven van Protobuf-seriële gegevens van een bron of naar een sink. Momenteel ondersteunt Flink Protobuf niet rechtstreeks om de status rechtstreeks te serialiseren en ondersteunt het geen schema-evolutie, zoals wel het geval is voor euro, Bijvoorbeeld. U moet zich nog registreren aangepaste serialisator met wat overhead voor uw toepassing.

Apache Flink open source houden

Apache Flink vertrouwt intern op Akka voor het verzenden van gegevens tussen subtaken. In 2022, Lightbend, het bedrijf achter Akka, kondigde een licentiewijziging aan voor toekomstige Akka-versies, van Apache 2.0 naar een restrictievere licentie, en dat Akka 2.6, de versie die door Apache Flink wordt gebruikt, geen verdere beveiligingsupdate of oplossing zou ontvangen.

Hoewel Akka historisch gezien zeer stabiel is en geen frequente updates vereist, vormde deze licentiewijziging een risico voor het Apache Flink-project. De beslissing van de Apache Flink-gemeenschap was om Akka te vervangen door een fork van versie 2.6, genaamd Apache Pekko (FLINK-32468). Deze fork behoudt de Apache 2.0-licentie en ontvangt alle vereiste updates van de community. In de tussentijd zal de Apache Flink-gemeenschap overwegen of ze de afhankelijkheid van Akka of Pekko volledig wil wegnemen.

Staatscompressie

Apache Flink biedt optionele compressie (standaard: uit) voor alle checkpoints en savepoints. Apache Flink heeft een bug geïdentificeerd in Flink 1.18.1 waar de operatorstatus niet correct kon worden hersteld wanneer snapshotcompressie is ingeschakeld. Dit kan resulteren in gegevensverlies of het onvermogen om vanaf het controlepunt te herstellen. Om dit op te lossen heeft Managed Service voor Apache Flink de repareren die zullen worden opgenomen in toekomstige versies van Apache Flink.

In-place versie-upgrades met Managed Service voor Apache Flink

Als u momenteel een applicatie uitvoert op Managed Service voor Apache Flink met Apache Flink 1.15 of ouder, kunt u deze nu ter plekke upgraden naar 1.18 zonder de status te verliezen, met behulp van de AWS-opdrachtregelinterface (AWS-CLI), AWS CloudFormatie or AWS Cloud-ontwikkelingskit (AWS CDK), of een tool die de AWS API gebruikt.

De Applicatie updaten API-actie ondersteunt nu het bijwerken van de Apache Flink-runtimeversie van een bestaande Managed Service voor Apache Flink-applicatie. U kunt UpdateApplication rechtstreeks op een actieve applicatie gebruiken.

Voordat u doorgaat met de interne update, moet u de afhankelijkheden in uw applicatie verifiëren en bijwerken, en ervoor zorgen dat ze compatibel zijn met de nieuwe Apache Flink-versie. In het bijzonder moet u elke Apache Flink-bibliotheek, connectoren en mogelijk Scala-versie bijwerken.

We raden u ook aan de bijgewerkte applicatie te testen voordat u doorgaat met de update. We raden aan om lokaal en in een niet-productieomgeving te testen, met behulp van de doel-Apache Flink-runtimeversie, om er zeker van te zijn dat er geen regressies zijn geïntroduceerd.

En tot slot, als uw aanvraag stateful is, raden we u aan een momentopname van de actieve applicatiestatus. Hiermee kunt u teruggaan naar de vorige applicatieversie.

Als u er klaar voor bent, kunt u nu de Applicatie updaten API-actie of update-applicatie AWS CLI-opdracht om de runtimeversie van de applicatie bij te werken en deze naar het nieuwe applicatieartefact, JAR- of zip-bestand te verwijzen, met de bijgewerkte afhankelijkheden.

Voor meer gedetailleerde informatie over het proces en de API, zie In-place versie-upgrade voor Apache Flink. De documentatie bevat stapsgewijze instructies en een video die u door het upgradeproces leidt.

Conclusies

In dit bericht hebben we enkele nieuwe functies van Apache Flink onderzocht, ondersteund in Amazon Managed Service voor Apache Flink. Deze lijst is niet volledig. Apache Flink introduceerde ook een aantal veelbelovende functies, zoals TTL op operatorniveau voor de SQL en Table API [FLIP-292] en tijdreizen [FLIP-308], maar deze worden nog niet ondersteund door de API, en nog niet echt toegankelijk voor gebruikers. Om deze reden hebben we besloten deze niet in dit bericht te behandelen.

Met de ondersteuning van Apache Flink 1.18 ondersteunt Managed Service voor Apache Flink nu de laatst uitgebrachte Apache Flink-versie. We hebben enkele van de interessante nieuwe functies en nieuwe connectoren gezien die beschikbaar zijn in Apache Flink 1.18 en hoe Managed Service voor Apache Flink u helpt een bestaande applicatie te upgraden.

U kunt meer details over recente releases vinden op de Apache Flink-blog en release-opmerkingen:

Als u nieuw bent bij Apache Flink, raden wij u onze gids voor het kiezen van de juiste API en taal en het volgen van de aan de slag-gids om Managed Service voor Apache Flink te gaan gebruiken.


Over de auteurs

Lorenzo NicoraLorenzo Nicora werkt als Senior Streaming Solution Architect bij AWS en helpt klanten in heel EMEA. Hij bouwt al meer dan 25 jaar cloud-native, data-intensieve systemen en werkt in de financiële sector, zowel via adviesbureaus als voor FinTech-productbedrijven. Hij heeft op grote schaal gebruik gemaakt van open-sourcetechnologieën en heeft bijgedragen aan verschillende projecten, waaronder Apache Flink.

Francisco MorilloFrancisco Morillo is een streamingoplossingenarchitect bij AWS. Francisco werkt samen met AWS-klanten en helpt hen bij het ontwerpen van realtime analyse-architecturen met behulp van AWS-services, ter ondersteuning van Amazon MSK en Amazon Managed Service voor Apache Flink.

spot_img

Laatste intelligentie

spot_img