Logo Zephyrnet

Zorganizuj kompleksowy potok ETL przy użyciu Amazon S3, AWS Glue i Amazon Redshift Serverless z Amazon MWAA | Usługi internetowe Amazona

Data:

Przepływy pracy zarządzane przez Amazon dla Apache Airflow (Amazon MWAA) to zarządzana usługa orkiestracji dla Przepływ powietrza Apache których można używać do konfigurowania i obsługi potoków danych w chmurze na dużą skalę. Apache Airflow to narzędzie typu open source służące do programowego tworzenia, planowania i monitorowania sekwencji procesów i zadań, zwanych przepływów pracy. Dzięki Amazon MWAA możesz używać Apache Airflow i Python do tworzenia przepływów pracy bez konieczności zarządzania podstawową infrastrukturą pod kątem skalowalności, dostępności i bezpieczeństwa.

Korzystając z wielu kont AWS, organizacje mogą skutecznie skalować swoje obciążenia i zarządzać ich złożonością w miarę ich wzrostu. Takie podejście zapewnia solidny mechanizm łagodzenia potencjalnego wpływu zakłóceń lub awarii, zapewniając ciągłość działania krytycznych obciążeń. Dodatkowo umożliwia optymalizację kosztów poprzez dopasowywanie zasobów do konkretnych przypadków użycia, zapewniając dobrą kontrolę wydatków. Izolując obciążenia o określonych wymaganiach dotyczących bezpieczeństwa lub zgodności, organizacje mogą zachować najwyższy poziom prywatności i bezpieczeństwa danych. Co więcej, możliwość uporządkowania wielu kont AWS pozwala na dostosowanie procesów biznesowych i zasobów do unikalnych wymagań operacyjnych, regulacyjnych i budżetowych. Takie podejście promuje wydajność, elastyczność i skalowalność, umożliwiając dużym przedsiębiorstwom zaspokajanie ich zmieniających się potrzeb i osiąganie celów.

W tym poście pokazano, jak zaaranżować kompleksowe wyodrębnianie, przekształcanie i ładowanie potoku (ETL) przy użyciu Usługa Amazon Simple Storage (Amazonka S3), Klej AWS, Bezserwerowe Amazon Redshift z Amazonem MWAA.

Omówienie rozwiązania

W tym poście rozważamy przypadek użycia, w którym zespół inżynierów danych chce zbudować proces ETL i zapewnić użytkownikom końcowym najlepsze doświadczenia, gdy chcą zapytać o najnowsze dane po dodaniu nowych nieprzetworzonych plików do Amazon S3 w centrali konto (konto A na poniższym schemacie architektury). Zespół inżynierów danych chce oddzielić surowe dane na własne konto AWS (konto B na diagramie) w celu zwiększenia bezpieczeństwa i kontroli. Chcą także wykonywać prace związane z przetwarzaniem i transformacją danych na swoim własnym koncie (konto B), aby rozdzielić obowiązki i zapobiec niezamierzonym zmianom w surowych danych źródłowych znajdujących się na rachunku centralnym (konto A). Takie podejście pozwala zespołowi przetwarzać surowe dane pobrane z Konta A na Konto B, które jest dedykowane do zadań związanych z obsługą danych. Dzięki temu surowe i przetworzone dane można w razie potrzeby bezpiecznie przechowywać oddzielnie na wielu kontach, co zapewnia lepsze zarządzanie danymi i bezpieczeństwo.

Nasze rozwiązanie wykorzystuje kompleksowy potok ETL zaaranżowany przez Amazon MWAA, który wyszukuje nowe pliki przyrostowe w lokalizacji Amazon S3 na koncie A, gdzie obecne są surowe dane. Odbywa się to poprzez wywołanie zadań ETL AWS Glue i zapisanie do obiektów danych w klastrze Redshift Serverless na koncie B. Następnie rozpoczyna się działanie potoku procedury składowane i polecenia SQL w systemie Redshift Serverless. Gdy zapytania zakończą się, an ROZŁADOWAĆ operacja jest wywoływana z hurtowni danych Redshift do segmentu S3 na koncie A.

Ponieważ bezpieczeństwo jest ważne, w tym poście opisano również, jak skonfigurować połączenie Airflow za pomocą Menedżer tajemnic AWS aby uniknąć przechowywania poświadczeń bazy danych w połączeniach i zmiennych Airflow.

Poniższy diagram ilustruje przegląd architektury komponentów zaangażowanych w orkiestrację przepływu pracy.

Przepływ pracy składa się z następujących elementów:

  • Źródłowy i docelowy zasobnik S3 znajdują się na koncie centralnym (konto A), natomiast Amazon MWAA, AWS Glue i Amazon Redshift znajdują się na innym koncie (konto B). Skonfigurowano dostęp między kontami pomiędzy segmentami S3 na koncie A i zasobami na koncie B, aby móc ładować i rozładowywać dane.
  • Na drugim koncie Amazon MWAA jest hostowany w jednym VPC, a Redshift Serverless w innym VPC, które są połączone poprzez peering VPC. Grupa robocza Redshift Serverless jest zabezpieczona w prywatnych podsieciach w trzech strefach dostępności.
  • Sekrety takie jak nazwa użytkownika, hasło, port DB i region AWS dla Redshift Serverless są przechowywane w Menedżerze sekretów.
  • Punkty końcowe VPC są tworzone dla Amazon S3 i Secrets Manager w celu interakcji z innymi zasobami.
  • Zwykle inżynierowie danych tworzą acykliczny wykres ukierunkowany przepływem powietrza (DAG) i przesyłają swoje zmiany do GitHub. W przypadku akcji GitHub są one wdrażane w zasobniku S3 na koncie B (w tym poście przesyłamy pliki bezpośrednio do zasobnika S3). Wiadro S3 przechowuje pliki związane z przepływem powietrza, takie jak pliki DAG, requirements.txt pliki i wtyczki. Skrypty i zasoby AWS Glue ETL są przechowywane w innym wiadrze S3. To oddzielenie pomaga utrzymać organizację i uniknąć zamieszania.
  • Airflow DAG wykorzystuje różnych operatorów, czujniki, połączenia, zadania i reguły do ​​uruchamiania potoku danych w razie potrzeby.
  • Dzienniki Airflow są zalogowane Amazon Cloud Watchi alerty można skonfigurować do zadań monitorowania. Aby uzyskać więcej informacji, zobacz Monitorowanie pulpitów nawigacyjnych i alarmów w Amazon MWAA.

Wymagania wstępne

Ponieważ to rozwiązanie koncentruje się wokół wykorzystania usługi Amazon MWAA do koordynowania potoku ETL, należy wcześniej skonfigurować pewne podstawowe zasoby na kontach. W szczególności musisz utworzyć segmenty i foldery S3, zasoby AWS Glue i zasoby Redshift Serverless na odpowiednich kontach przed wdrożeniem pełnej integracji przepływu pracy za pomocą Amazon MWAA.

Wdróż zasoby na koncie A przy użyciu AWS CloudFormation

Na koncie A uruchom dostarczone Tworzenie chmury AWS stos, aby utworzyć następujące zasoby:

  • Źródłowe i docelowe zasobniki i foldery S3. Zgodnie z najlepszą praktyką struktury zasobników wejściowych i wyjściowych są formatowane przy użyciu partycjonowania w stylu gałęzi jako s3://<bucket>/products/YYYY/MM/DD/.
  • Przykładowy zbiór danych o nazwie products.csv, którego używamy w tym poście.

Prześlij zadanie klejenia AWS do Amazon S3 na koncie B

Na koncie B utwórz lokalizację Amazon S3 o nazwie aws-glue-assets-<account-id>-<region>/scripts (jeśli nie występuje). Zastąp parametry identyfikatora konta i regionu w pliku sample_glue_job.py skrypt i prześlij plik zadania AWS Glue do lokalizacji Amazon S3.

Wdróż zasoby na koncie B za pomocą AWS CloudFormation

Na koncie B uruchom dostarczony szablon stosu CloudFormation, aby utworzyć następujące zasoby:

  • Łyżka S3 airflow-<username>-bucket do przechowywania plików związanych z Airflow o następującej strukturze:
    • dagi – Folder plików DAG.
    • wtyczki – Plik dowolnych niestandardowych lub społecznościowych wtyczek Airflow.
    • wymagania - requirements.txt plik dla dowolnych pakietów Pythona.
    • skrypty – Wszelkie skrypty SQL używane w DAG.
    • dane – Wszelkie zbiory danych wykorzystywane w DAG.
  • Środowisko bezserwerowe Redshift. Nazwa grupy roboczej i przestrzeni nazw są poprzedzone prefiksem sample.
  • Środowisko AWS Glue, które zawiera:
    • Klej AWS crawler, który przeszukuje dane z zasobnika źródłowego S3 sample-inp-bucket-etl-<username> na koncie A.
    • Baza danych tzw products_db w Katalogu danych kleju AWS.
    • ELT praca nazywa sample_glue_job. To zadanie może czytać pliki z products tabeli w katalogu danych i załaduj dane do tabeli przesunięcia ku czerwieni products.
  • Punkt końcowy bramy VPC do Amazon S3.
  • Środowisko Amazon MWAA. Szczegółowe instrukcje dotyczące tworzenia środowiska Amazon MWAA przy użyciu konsoli Amazon MWAA można znaleźć w artykule Przedstawiamy przepływy pracy zarządzane przez Amazon dla Apache Airflow (MWAA).

uruchom stos 1

Twórz zasoby Amazon Redshift

Utwórz dwie tabele i procedurę składowaną w grupie roboczej Redshift Serverless za pomocą produkty.sql plik.

W tym przykładzie tworzymy dwie tabele o nazwie products i products_f. Nazwa procedury składowanej to sp_products.

Skonfiguruj uprawnienia przepływu powietrza

Po pomyślnym utworzeniu środowiska Amazon MWAA status będzie wyświetlany jako Dostępny, Wybierać Otwórz interfejs przepływu powietrza , aby wyświetlić interfejs Airflow. DAG są automatycznie synchronizowane z segmentu S3 i widoczne w interfejsie użytkownika. Jednak na tym etapie w folderze S3 nie ma żadnych DAG-ów.

Dodaj zasady zarządzane przez klienta AmazonMWAAFullConsoleAccess, który przyznaje użytkownikom Airflow uprawnienia dostępu AWS Zarządzanie tożsamością i dostępem (IAM) i dołącz tę politykę do roli Amazon MWAA. Aby uzyskać więcej informacji, zobacz Dostęp do środowiska Amazon MWAA.

Zasady powiązane z rolą Amazon MWAA mają pełny dostęp i można ich używać wyłącznie do celów testowych w bezpiecznym środowisku testowym. W przypadku wdrożeń produkcyjnych postępuj zgodnie z zasadą najmniejszych uprawnień.

Skonfiguruj środowisko

W tej sekcji opisano kroki konfigurowania środowiska. Proces obejmuje następujące etapy wysokiego poziomu:

  1. Zaktualizuj wszystkich niezbędnych dostawców.
  2. Skonfiguruj dostęp dla wielu kont.
  3. Ustanów połączenie równorzędne VPC pomiędzy Amazon MWAA VPC i Amazon Redshift VPC.
  4. Skonfiguruj Menedżera sekretów do integracji z Amazon MWAA.
  5. Zdefiniuj połączenia przepływu powietrza.

Zaktualizuj dostawców

Wykonaj kroki opisane w tej sekcji, jeśli Twoja wersja Amazon MWAA jest starsza niż 2.8.1 (najnowsza wersja w momencie pisania tego posta).

Dostawcy to pakiety utrzymywane przez społeczność i zawierające wszystkich podstawowych operatorów, haczyki i czujniki dla danej usługi. Dostawca Amazon jest używany do interakcji z usługami AWS, takimi jak Amazon S3, Amazon Redshift Serverless, AWS Glue i nie tylko. W ramach dostawcy Amazon istnieje ponad 200 modułów.

Chociaż wersja Airflow obsługiwana w Amazon MWAA to 2.6.3 i jest dołączona do pakietu dostarczonego przez Amazon w wersji 8.2.0, obsługa Amazon Redshift Serverless została dodana dopiero w pakiecie dostarczonym przez Amazon w wersji 8.4.0. Ponieważ domyślna wersja dostawcy w pakiecie jest starsza niż w momencie wprowadzenia obsługi Redshift Serverless, wersja dostawcy musi zostać uaktualniona, aby móc korzystać z tej funkcjonalności.

Pierwszym krokiem jest aktualizacja pliku ograniczeń i requirements.txt plik z poprawnymi wersjami. Odnosić się do Określanie nowszych pakietów dostawców aby zapoznać się z instrukcjami aktualizacji pakietu dostawcy Amazon.

  1. Określ wymagania w następujący sposób:
    --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt"
    apache-airflow-providers-amazon==8.4.0

  2. Zaktualizuj wersję w pliku ograniczeń do wersji 8.4.0 lub nowszej.
  3. Dodaj ograniczenia-3.11-updated.txt plik do /dags teczka.

Odnosić się do Wersje Apache Airflow w Amazon Managed Workflows dla Apache Airflow dla poprawnych wersji pliku ograniczeń w zależności od wersji Airflow.

  1. Przejdź do środowiska Amazon MWAA i wybierz Edytuj.
  2. Pod Kod DAG w Amazon S3, Dla Plik wymagań, wybierz najnowszą wersję.
  3. Dodaj Zapisz.

Spowoduje to aktualizację środowiska i włączenie nowych dostawców.

  1. Aby zweryfikować wersję dostawcy, przejdź do Dostawcy pod Admin tabela.

Wersja pakietu dostawcy Amazon powinna być 8.4.0, jak pokazano na poniższym zrzucie ekranu. Jeśli nie, wystąpił błąd podczas ładowania requirements.txt. Aby debugować błędy, przejdź do konsoli CloudWatch i otwórz plik requirements_install_ip Zaloguj Się Rejestruj strumienie, gdzie wyszczególnione są błędy. Odnosić się do Włączanie logów na konsoli Amazon MWAA by uzyskać więcej szczegółów.

Skonfiguruj dostęp dla wielu kont

Aby uzyskać dostęp do zasobników S3 w celu ładowania i rozładowywania danych, musisz skonfigurować zasady i role dotyczące wielu kont między kontami A i kontami B. Wykonaj następujące kroki:

  1. Na koncie A skonfiguruj zasady zasobnika dla zasobnika sample-inp-bucket-etl-<username> aby przyznać uprawnienia do ról AWS Glue i Amazon MWAA na koncie B dla obiektów w wiadrze sample-inp-bucket-etl-<username>:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": [
                        "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>",
                        "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                    ]
                },
                "Action": [
                    "s3:GetObject",
    "s3:PutObject",
    		   "s3:PutObjectAcl",
    		   "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  2. Podobnie skonfiguruj zasady segmentu dla segmentu sample-opt-bucket-etl-<username> aby przyznać uprawnienia rolom Amazon MWAA na koncie B w celu umieszczenia obiektów w tym zasobniku:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                },
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  3. Na koncie A utwórz politykę uprawnień o nazwie policy_for_roleA, co pozwala na niezbędne działania Amazon S3 na wiadrze wyjściowym:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:Encrypt",
                    "kms:GenerateDataKey"
                ],
                "Resource": [
                    "<KMS_KEY_ARN_Used_for_S3_encryption>"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetBucketAcl",
                    "s3:GetBucketCors",
                    "s3:GetEncryptionConfiguration",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListBucketVersions",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*"
                ]
            }
        ]
    }

  4. Utwórz nową rolę IAM o nazwie RoleA z kontem B jako rolą zaufanej jednostki i dodaj tę politykę do tej roli. Dzięki temu konto B może przyjąć rolę A w celu wykonania niezbędnych działań Amazon S3 w zasobniku wyjściowym.
  5. Na koncie B utwórz politykę uprawnień o nazwie s3-cross-account-access z uprawnieniami dostępu do obiektów w wiadrze sample-inp-bucket-etl-<username>, który znajduje się na koncie A.
  6. Dodaj tę politykę do roli AWS Glue i Amazon MWAA:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*"
            }
        ]
    }

  7. Na koncie B utwórz politykę uprawnień policy_for_roleB określając Konto A jako podmiot zaufany. Poniżej przedstawiono politykę zaufania, jaką należy przyjąć RoleA na koncie A:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CrossAccountPolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA"
            }
        ]
    }

  8. Utwórz nową rolę IAM o nazwie RoleB z Amazon Redshift jako typem zaufanej jednostki i dodaj tę politykę do roli. To pozwala RoleB zakładać RoleA na koncie A, a także do przejęcia przez Amazon Redshift.
  9. Dołączać RoleB do przestrzeni nazw Redshift Serverless, dzięki czemu Amazon Redshift może zapisywać obiekty w zasobniku wyjściowym S3 na koncie A.
  10. Dołącz politykę policy_for_roleB do roli Amazon MWAA, która umożliwia Amazon MWAA dostęp do zasobnika wyjściowego na koncie A.

Odnosić się do Jak zapewnić dostęp dla wielu kont do obiektów znajdujących się w zasobnikach Amazon S3? aby uzyskać więcej informacji na temat konfigurowania dostępu między kontami do obiektów w Amazon S3 z AWS Glue i Amazon MWAA. Odnosić się do Jak KOPIOWAĆ lub WYŁADUĆ dane z Amazon Redshift do segmentu Amazon S3 na innym koncie? aby uzyskać więcej informacji na temat konfigurowania ról do przesyłania danych z Amazon Redshift do Amazon S3 z Amazon MWAA.

Skonfiguruj połączenie równorzędne VPC pomiędzy Amazon MWAA i Amazon Redshift VPC

Ponieważ Amazon MWAA i Amazon Redshift znajdują się w dwóch oddzielnych VPC, musisz skonfigurować między nimi komunikację równorzędną VPC. Należy dodać trasę do tabel tras powiązanych z podsieciami obu usług. Odnosić się do Pracuj z połączeniami równorzędnymi VPC aby uzyskać szczegółowe informacje na temat komunikacji równorzędnej VPC.

Upewnij się, że zakres CIDR Amazon MWAA VPC jest dozwolony w grupie zabezpieczeń Redshift, a zakres CIDR Amazon Redshift VPC jest dozwolony w grupie zabezpieczeń Amazon MWAA, jak pokazano na poniższym zrzucie ekranu.

Jeśli którykolwiek z powyższych kroków zostanie skonfigurowany nieprawidłowo, w przebiegu DAG prawdopodobnie wystąpi błąd „Przekroczono limit czasu połączenia”.

Skonfiguruj połączenie Amazon MWAA z Menedżerem sekretów

Gdy potok Amazon MWAA jest skonfigurowany do korzystania z Menedżera sekretów, najpierw będzie szukać połączeń i zmiennych w alternatywnym zapleczu (np. Menedżerze sekretów). Jeśli alternatywny backend zawiera potrzebną wartość, jest ona zwracana. W przeciwnym razie sprawdzi bazę danych metadanych pod kątem wartości i zamiast tego zwróci ją. Więcej szczegółów znajdziesz w Konfigurowanie połączenia Apache Airflow przy użyciu klucza tajnego AWS Secrets Manager.

Wykonaj następujące kroki:

  1. Skonfiguruj plik Punkt końcowy VPC aby połączyć Amazon MWAA i Menedżera sekretów (com.amazonaws.us-east-1.secretsmanager).

Dzięki temu usługa Amazon MWAA może uzyskać dostęp do poświadczeń przechowywanych w Menedżerze sekretów.

  1. Aby zapewnić Amazon MWAA pozwolenie na dostęp do tajnych kluczy Secrets Manager, dodaj zasadę o nazwie SecretsManagerReadWrite do roli IAM środowiska.
  2. Aby utworzyć zaplecze Menedżera sekretów jako opcję konfiguracji Apache Airflow, przejdź do opcji konfiguracji Airflow, dodaj następujące pary klucz-wartość i zapisz swoje ustawienia.

To konfiguruje Airflow tak, aby szukał parametrów połączenia i zmiennych w pliku airflow/connections/* i airflow/variables/* ścieżki:

secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix": "przepływ powietrza/połączenia", "variables_prefix": "przepływ powietrza/zmienne"}

  1. Aby wygenerować ciąg URI połączenia Airflow, przejdź do Chmura AWS i wejdź do powłoki Pythona.
  2. Uruchom następujący kod, aby wygenerować ciąg URI połączenia:
    import urllib.parse
    conn_type = 'redshift'
    host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint
    port = '5439'
    login = 'admin' #Specify the username to use for authentication with Amazon Redshift
    password = '<password>' #Specify the password to use for authentication with Amazon Redshift
    role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>')
    database = 'dev'
    region = 'us-east-1' #YOUR_REGION
    conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&region={7}'.format(conn_type, login, password, host, port, role_arn, database, region)
    print(conn_string)
    

Ciąg połączenia powinien zostać wygenerowany w następujący sposób:

redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=<region>

  1. Dodaj połączenie w Menedżerze sekretów, używając następującego polecenia w pliku Interfejs wiersza poleceń AWS (interfejs wiersza poleceń AWS).

Można to również zrobić z konsoli Menedżera sekretów. Zostanie to dodane w Menedżerze sekretów jako zwykły tekst.

aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=us-east-1" --region=us-east-1

Skorzystaj z połączenia airflow/connections/secrets_redshift_connection w DAG-u. Po uruchomieniu DAG będzie szukać tego połączenia i pobiera sekrety z Menedżera sekretów. W przypadku RedshiftDataOperator, przekaż secret_arn jako parametr zamiast nazwy połączenia.

Możesz także dodawać wpisy tajne za pomocą konsoli Menedżera kluczy tajnych jako pary klucz-wartość.

  1. Dodaj kolejny sekret w Menedżerze sekretów i zapisz go jako airflow/connections/redshift_conn_test.

Utwórz połączenie Airflow za pośrednictwem bazy danych metadanych

Połączenia można także tworzyć w interfejsie użytkownika. W takim przypadku szczegóły połączenia zostaną zapisane w bazie danych metadanych Airflow. Jeśli środowisko Amazon MWAA nie jest skonfigurowane do korzystania z zaplecza Menedżera sekretów, sprawdzi wartość w bazie danych metadanych i zwróci ją. Możesz utworzyć połączenie Airflow za pomocą interfejsu użytkownika, interfejsu wiersza polecenia AWS lub interfejsu API. W tej sekcji pokazujemy, jak utworzyć połączenie za pomocą interfejsu użytkownika Airflow.

  1. W razie zamówieenia projektu Identyfikator połączenia, wprowadź nazwę połączenia.
  2. W razie zamówieenia projektu Rodzaj zasilaniawybierz Amazonka Przesunięcie ku czerwieni.
  3. W razie zamówieenia projektu Gospodarz, wprowadź punkt końcowy Redshift (bez portu i bazy danych) dla Redshift Serverless.
  4. W razie zamówieenia projektu Baza danych, wchodzić dev.
  5. W razie zamówieenia projektu Użytkownik, wprowadź swoją nazwę użytkownika administratora.
  6. W razie zamówieenia projektu Hasło, Wprowadź hasło.
  7. W razie zamówieenia projektu Port, użyj portu 5439.
  8. W razie zamówieenia projektu Extra, Ustaw region i timeout parametry.
  9. Przetestuj połączenie, a następnie zapisz ustawienia.

Utwórz i uruchom DAG

W tej sekcji opisujemy, jak utworzyć DAG przy użyciu różnych komponentów. Po utworzeniu i uruchomieniu DAG możesz zweryfikować wyniki, wysyłając zapytania do tabel Redshift i sprawdzając docelowe segmenty S3.

Utwórz DAG

W Airflow potoki danych są zdefiniowane w kodzie Pythona jako DAG. Tworzymy DAG, który składa się z różnych operatorów, czujników, połączeń, zadań i reguł:

  • DAG rozpoczyna się od wyszukania plików źródłowych w wiadrze S3 sample-inp-bucket-etl-<username> na koncie A na bieżący dzień S3KeySensor. S3KeySensor służy do oczekiwania na obecność jednego lub wielu kluczy w segmencie S3.
    • Na przykład nasz wiadro S3 jest podzielony na partycje s3://bucket/products/YYYY/MM/DD/, więc nasz czujnik powinien sprawdzić foldery z aktualną datą. Wyprowadziliśmy aktualną datę z DAG i przekazaliśmy ją do S3KeySensor, który szuka nowych plików w folderze bieżącego dnia.
    • Ustawiamy również wildcard_match as True, który umożliwia wyszukiwanie w bucket_key należy interpretować jako wzór wieloznaczny Uniksa. Ustaw mode do reschedule tak, aby zadanie czujnika zwalniało miejsce pracownika w przypadku niespełnienia kryteriów i zostało przełożone na później. Najlepszym rozwiązaniem jest używanie tego trybu, gdy poke_interval trwa dłużej niż 1 minutę, aby zapobiec zbyt dużemu obciążeniu programu planującego.
  • Gdy plik będzie już dostępny w zasobniku S3, robot AWS Glue uruchamia się przy użyciu pliku GlueCrawlerOperator aby przeszukać segment źródłowy S3 sample-inp-bucket-etl-<username> w obszarze Konto A i aktualizuje metadane tabeli w obszarze products_db bazy danych w Katalogu Danych. Robot wykorzystuje rolę AWS Glue i bazę danych Data Catalog utworzoną w poprzednich krokach.
  • DAG używa GlueCrawlerSensor poczekać na zakończenie działania robota.
  • Po zakończeniu zadania przeszukiwacza GlueJobOperator służy do uruchamiania zadania klejenia AWS. Nazwa skryptu AWS Glue (wraz z lokalizacją) jest przekazywana operatorowi wraz z rolą AWS Glue IAM. Inne parametry, np GlueVersion, NumberofWorkers, WorkerType są przekazywane za pomocą create_job_kwargs parametr.
  • DAG używa GlueJobSensor poczekać na zakończenie zadania klejenia AWS. Po zakończeniu pojawi się tabela pomostowa Redshift products zostaną załadowane dane z pliku S3.
  • Możesz połączyć się z Amazon Redshift z Airflow za pomocą trzech różnych operatorzy:
    • PythonOperator.
    • SQLExecuteQueryOperator, który wykorzystuje połączenie PostgreSQL i redshift_default jako połączenie domyślne.
    • RedshiftDataOperator, który korzysta z interfejsu API danych Redshift i aws_default jako połączenie domyślne.

W naszym DAG używamy SQLExecuteQueryOperator i RedshiftDataOperator aby pokazać, jak używać tych operatorów. Uruchamiane są procedury składowane Redshift RedshiftDataOperator. DAG uruchamia również polecenia SQL w Amazon Redshift, aby usunąć dane z tabeli pomostowej za pomocą SQLExecuteQueryOperator.

Ponieważ skonfigurowaliśmy nasze środowisko Amazon MWAA do wyszukiwania połączeń w Menedżerze sekretów, po uruchomieniu DAG pobiera szczegóły połączenia Redshift, takie jak nazwa użytkownika, hasło, host, port i region z Menedżera sekretów. Jeśli połączenie nie zostanie znalezione w Menedżerze sekretów, wartości zostaną pobrane z połączeń domyślnych.

In SQLExecuteQueryOperator, przekazujemy nazwę połączenia, którą utworzyliśmy w Menedżerze sekretów. To szuka airflow/connections/secrets_redshift_connection i odzyskuje sekrety od Menedżera sekretów. Jeśli Menedżer sekretów nie jest skonfigurowany, połączenie utworzone ręcznie (na przykład redshift-conn-id) można przejść.

In RedshiftDataOperator, przekazujemy sekret_arn pliku airflow/connections/redshift_conn_test połączenie utworzone w Menedżerze sekretów jako parametr.

  • Jako ostatnie zadanie, RedshiftToS3Operator służy do wyładowywania danych z tabeli przesunięcia ku czerwieni do segmentu S3 sample-opt-bucket-etl na koncie B. airflow/connections/redshift_conn_test z Menedżera sekretów służy do rozładowywania danych.
  • TriggerRule jest ustawione na ALL_DONE, co umożliwia uruchomienie następnego kroku po zakończeniu wszystkich zadań nadrzędnych.
  • Zależność zadań definiuje się za pomocą chain() funkcję, która w razie potrzeby pozwala na równoległe wykonywanie zadań. W naszym przypadku chcemy, aby wszystkie zadania przebiegały po kolei.

Poniżej znajduje się pełny kod DAG. The dag_id powinna odpowiadać nazwie skryptu DAG, w przeciwnym razie nie zostanie zsynchronizowana z interfejsem użytkownika Airflow.

from datetime import datetime
from airflow import DAG 
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule


dag_id = "data_pipeline"
vYear = datetime.today().strftime("%Y")
vMonth = datetime.today().strftime("%m")
vDay = datetime.today().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="products"
#Please replace the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>"
glue_crawler_name = "products"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10

@task
def get_role_name(arn: str) -> str:
    return arn.split("/")[-1]

@task
def get_s3_loc(s3_folder: str) -> str:
    s3_loc  = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
    return s3_loc

with DAG(
    dag_id=dag_id,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    role_arn = glue_role_arn_key
    glue_role_name = get_role_name(role_arn)
    s3_loc = get_s3_loc(s3_folder)


    # Check for new incremental files in S3 source/input bucket
    sensor_key = S3KeySensor(
        task_id="sensor_key",
        bucket_key=s3_loc,
        bucket_name=src_bucket_name,
        wildcard_match=True,
        #timeout=18*60*60,
        #poke_interval=120,
        timeout=60,
        poke_interval=30,
        mode="reschedule"
    )

    # Run Glue crawler
    glue_crawler_config = {
        "Name": glue_crawler_name,
        "Role": role_arn,
        "DatabaseName": glue_db_name,
    }

    crawl_s3 = GlueCrawlerOperator(
        task_id="crawl_s3",
        config=glue_crawler_config,
    )

    # GlueCrawlerOperator waits by default, setting as False to test the Sensor below.
    crawl_s3.wait_for_completion = False

    # Wait for Glue crawler to complete
    wait_for_crawl = GlueCrawlerSensor(
        task_id="wait_for_crawl",
        crawler_name=glue_crawler_name,
    )

    # Run Glue Job
    submit_glue_job = GlueJobOperator(
        task_id="submit_glue_job",
        job_name=glue_job_name,
        script_location=glue_script_location,
        iam_role_name=glue_role_name,
        create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
    )

    # GlueJobOperator waits by default, setting as False to test the Sensor below.
    submit_glue_job.wait_for_completion = False

    # Wait for Glue Job to complete
    wait_for_job = GlueJobSensor(
        task_id="wait_for_job",
        job_name=glue_job_name,
        # Job ID extracted from previous Glue Job Operator task
        run_id=submit_glue_job.output,
        verbose=True,  # prints glue job logs in airflow logs
    )

    wait_for_job.poke_interval = 5

    # Execute the Stored Procedure in Redshift Serverless using Data Operator
    execute_redshift_stored_proc = RedshiftDataOperator(
        task_id="execute_redshift_stored_proc",
        database=db_name,
        workgroup_name=workgroup_name,
        secret_arn=secret_arn,
        sql="""CALL sp_products();""",
        poll_interval=poll_interval,
        wait_for_completion=True,
    )

    # Execute the Stored Procedure in Redshift Serverless using SQL Operator
    delete_from_table = SQLExecuteQueryOperator(
        task_id="delete_from_table",
        conn_id=redshift_conn_id_name,
        sql="DELETE FROM products;",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    # Unload the data from Redshift table to S3
    transfer_redshift_to_s3 = RedshiftToS3Operator(
        task_id="transfer_redshift_to_s3",
        s3_bucket=tgt_bucket_name,
        s3_key=s3_loc,
        schema="PUBLIC",
        table=redshift_table,
        redshift_conn_id=redshift_conn_id_name,
    )

    transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE

    #Chain the tasks to be executed
    chain(
        sensor_key,
        crawl_s3,
        wait_for_crawl,
        submit_glue_job,
        wait_for_job,
        execute_redshift_stored_proc,
        delete_from_table,
        transfer_redshift_to_s3
        )
    

Sprawdź działanie DAG

Po utworzeniu pliku DAG (zamień zmienne w skrypcie DAG) i przesłaniu go do s3://sample-airflow-instance/dags folderze, zostanie on automatycznie zsynchronizowany z interfejsem użytkownika Airflow. Wszystkie DAG-y pojawiają się na DAG-i patka. Przełącz ON opcja umożliwienia uruchomienia DAG. Ponieważ nasz DAG jest ustawiony na schedule="@once", musisz ręcznie uruchomić zadanie, wybierając ikonę uruchamiania poniżej Akcje. Po zakończeniu DAG status zostanie zaktualizowany na zielono, jak pokazano na poniższym zrzucie ekranu.

W Linki sekcji dostępne są opcje przeglądania kodu, wykresu, siatki, dziennika i innych. Wybierać Wykres wizualizować DAG w formie wykresu. Jak pokazano na poniższym zrzucie ekranu, każdy kolor węzła oznacza konkretnego operatora, a kolor konturu węzła oznacza konkretny status.

Sprawdź wyniki

W konsoli Amazon Redshift przejdź do Edytor zapytań v2 i wybierz dane w pliku products_f tabela. Tabela powinna zostać załadowana i posiadać taką samą liczbę rekordów jak pliki S3.

Na konsoli Amazon S3 przejdź do segmentu S3 s3://sample-opt-bucket-etl na koncie B. The product_f pliki należy utworzyć w ramach struktury folderów s3://sample-opt-bucket-etl/products/YYYY/MM/DD/.

Sprzątać

Wyczyść zasoby utworzone w ramach tego posta, aby uniknąć ponoszenia bieżących opłat:

  1. Usuń stosy CloudFormation i zasobnik S3 utworzone jako wymagania wstępne.
  2. Usuń VPC i połączenia równorzędne VPC, zasady i role dla wielu kont oraz wpisy tajne w Menedżerze sekretów.

Wnioski

Dzięki Amazon MWAA możesz tworzyć złożone przepływy pracy przy użyciu Airflow i Python bez zarządzania klastrami, węzłami lub innymi narzutami operacyjnymi zwykle związanymi z wdrażaniem i skalowaniem Airflow w środowisku produkcyjnym. W tym poście pokazaliśmy, jak Amazon MWAA zapewnia zautomatyzowany sposób przyjmowania, przekształcania, analizowania i dystrybucji danych pomiędzy różnymi kontami i usługami w AWS. Więcej przykładów innych operatorów AWS można znaleźć poniżej Repozytorium GitHub; zachęcamy do dowiedzenia się więcej poprzez wypróbowanie niektórych z tych przykładów.


O autorach


Radhika Jakkula jest architektem rozwiązań prototypowania Big Data w AWS. Pomaga klientom budować prototypy przy użyciu usług analitycznych AWS i specjalnie zbudowanych baz danych. Jest specjalistką w ocenie szerokiego zakresu wymagań i zastosowaniu odpowiednich usług AWS, narzędzi i frameworków Big Data do stworzenia solidnej architektury.

Sidhantha Muralidhara jest głównym menedżerem ds. technicznych w AWS. Współpracuje z dużymi klientami korporacyjnymi, którzy uruchamiają swoje obciążenia na platformie AWS. Pasjonuje się pracą z klientami i pomaganiem im w projektowaniu obciążeń pod kątem kosztów, niezawodności, wydajności i doskonałości operacyjnej na dużą skalę w ich podróży do chmury. Interesuje się także analizą danych.

spot_img

Najnowsza inteligencja

spot_img