Logo na Zephyrnet

Ƙaddamar da bututun ETL na ƙarshe zuwa ƙarshe ta amfani da Amazon S3, AWS Glue, da Amazon Redshift Serverless tare da Amazon MWAA | Ayyukan Yanar Gizo na Amazon

kwanan wata:

Gudun Gudun Ayyuka na Amazon don Apache Airflow (Amazon MWAA) sabis ne na ƙungiyar makaɗa da aka sarrafa don apache iska wanda zaku iya amfani dashi don saitawa da sarrafa bututun bayanai a cikin gajimare a sikelin. Apache Airflow kayan aiki ne na buɗaɗɗen tushe da ake amfani da shi don tsara shirye-shirye, tsarawa, da kuma lura da jerin matakai da ayyuka, wanda ake magana da shi azaman workflows. Tare da Amazon MWAA, za ku iya amfani da Apache Airflow da Python don ƙirƙirar ayyukan aiki ba tare da sarrafa abubuwan da ke da tushe ba don haɓakawa, samuwa, da tsaro.

Ta hanyar amfani da asusun AWS da yawa, ƙungiyoyi za su iya daidaita nauyin aikin su yadda ya kamata da sarrafa sarkar su yayin da suke girma. Wannan hanya tana ba da ingantacciyar hanya don rage yuwuwar tasirin rushewa ko gazawa, tabbatar da cewa manyan ayyuka suna ci gaba da aiki. Bugu da ƙari, yana ba da damar haɓaka farashi ta hanyar daidaita albarkatu tare da takamaiman abubuwan amfani, tabbatar da cewa an sarrafa kashe kuɗi da kyau. Ta hanyar keɓe nauyin aiki tare da takamaiman buƙatun tsaro ko buƙatun yarda, ƙungiyoyi za su iya kula da mafi girman matakan sirri da tsaro. Bugu da ƙari, ikon tsara asusun AWS da yawa a cikin tsari mai tsari yana ba ku damar daidaita tsarin kasuwancin ku da albarkatun ku gwargwadon aikinku na musamman, tsari, da buƙatun kasafin kuɗi. Wannan tsarin yana haɓaka inganci, sassauci, da haɓakawa, yana baiwa manyan kamfanoni damar biyan buƙatunsu masu tasowa da cimma burinsu.

Wannan sakon yana nuna yadda ake tsara bututun bututun daga ƙarshen zuwa-ƙarshen, canzawa, da kaya (ETL) ta amfani da Sabis na Sauƙi na Amazon (Amazon S3) AWS Manne, Da kuma Amazon Redshift Serverless da Amazon MWA.

Bayanin bayani

Don wannan matsayi, muna la'akari da yanayin amfani inda ƙungiyar injiniyan bayanai ke son gina tsarin ETL kuma suna ba da mafi kyawun kwarewa ga masu amfani da su na ƙarshe lokacin da suke so su nemi sabon bayanan bayan an ƙara sababbin fayilolin da aka ƙara zuwa Amazon S3 a tsakiya. asusu (Asusun A a cikin zanen gine-gine mai zuwa). Ƙungiyar injiniyan bayanai tana son raba ɗanyen bayanan cikin asusun AWS nata (Asusun B a cikin zane) don ƙarin tsaro da sarrafawa. Suna kuma son yin aikin sarrafa bayanai da aikin sauyi a cikin asusun nasu (Account B) don rarraba ayyuka da hana duk wani canje-canjen da ba a yi niyya ba ga tushen tushen bayanan da ke cikin asusun tsakiya (Account A). Wannan hanyar tana ba ƙungiyar damar aiwatar da ɗanyen bayanan da aka fitar daga Asusun A zuwa Asusu na B, wanda aka keɓe don ayyukan sarrafa bayanai. Wannan yana tabbatar da cewa za'a iya kiyaye danyen bayanan da aka sarrafa amintacce a cikin asusu da yawa, idan an buƙata, don ingantaccen tsarin mulki da tsaro.

Maganin mu yana amfani da bututun ETL na ƙarshe-zuwa-ƙarshen da Amazon MWAA ya tsara wanda ke neman sabbin fayiloli masu haɓakawa a cikin wurin Amazon S3 a cikin Asusu A, inda albarkatun da ke akwai. Ana yin wannan ta hanyar kiran ayyukan AWS Glue ETL da rubutawa zuwa abubuwan bayanai a cikin gungu na Redshift Serverless a Account B. Bututun ya fara aiki hanyoyin da aka adana da umarnin SQL akan Redshift Serverless. Yayin da tambayoyin suka ƙare, an Cire KYAUTA Ana kiran aikin daga ma'ajin bayanan Redshift zuwa guga S3 a cikin Account A.

Saboda tsaro yana da mahimmanci, wannan post ɗin kuma ya shafi yadda ake saita haɗin iska ta amfani da shi Manajan Sirrin AWS don kauce wa adana bayanan bayanan sirri a cikin haɗin Airflow da masu canji.

Hoton da ke gaba yana kwatanta tsarin gine-gine na abubuwan da ke tattare da tsara tsarin aikin.

Tsarin aiki ya ƙunshi abubuwa masu zuwa:

  • Tushen da buckets S3 da aka yi niyya suna cikin asusun tsakiya (Account A), yayin da Amazon MWAA, AWS Glue, da Amazon Redshift suna cikin wani asusu na daban (Asusun B). An saita hanyar shiga-asusu tsakanin S3 buckets a cikin Account A tare da albarkatu a cikin Asusu B don samun damar lodawa da sauke bayanai.
  • A cikin asusun na biyu, Amazon MWAA an shirya shi a cikin VPC guda ɗaya da Redshift Serverless a cikin wani VPC daban-daban, waɗanda aka haɗa ta hanyar VPC peering. Rukunin aiki mara amfani na Redshift yana da tsaro a cikin rukunonin rukunoni masu zaman kansu a cikin Yankunan Samuwa guda uku.
  • Sirri kamar sunan mai amfani, kalmar sirri, tashar DB, da yankin AWS don Redshift Serverless ana adana su a cikin Manajan Sirrin.
  • An ƙirƙiri wuraren ƙarshen VPC don Amazon S3 da Manajan Sirrin don yin hulɗa tare da sauran albarkatu.
  • Yawancin lokaci, injiniyoyin bayanai suna ƙirƙirar Graph Directed Acyclic Graph (DAG) kuma suna aiwatar da canje-canjen su zuwa GitHub. Tare da ayyukan GitHub, ana tura su zuwa guga S3 a cikin Asusu B (don wannan post ɗin, muna loda fayilolin cikin guga S3 kai tsaye). Guga S3 yana adana fayilolin da ke da alaƙa da iska kamar fayilolin DAG, requirements.txt fayiloli, da kuma plugins. Ana adana rubutun AWS Glue ETL da kadarori a cikin wani guga na S3. Wannan rabuwa yana taimakawa kiyaye tsari da kuma guje wa rudani.
  • Airflow DAG yana amfani da masu aiki daban-daban, na'urori masu auna firikwensin, haɗin kai, ayyuka, da dokoki don gudanar da bututun bayanai kamar yadda ake buƙata.
  • An shiga rajistan ayyukan Airflow CloudWatch na Amazon, kuma ana iya saita faɗakarwa don ayyukan sa ido. Don ƙarin bayani, duba Kula da dashboards da ƙararrawa akan Amazon MWAA.

abubuwan da ake bukata

Saboda wannan mafita ta ƙunshi amfani da Amazon MWAA don tsara bututun ETL, kuna buƙatar saita wasu tushe na tushe a cikin asusu tukuna. Musamman, kuna buƙatar ƙirƙirar buckets na S3 da manyan fayiloli, albarkatun AWS Glue, da Redshift Serverless albarkatun a cikin asusun su kafin aiwatar da cikakken haɗin gwiwar aiki ta amfani da Amazon MWAA.

Sanya albarkatu a cikin Account A ta amfani da AWS CloudFormation

A cikin Account A, ƙaddamar da abin da aka bayar AWS Cloud Formation tari don ƙirƙirar albarkatu masu zuwa:

  • Tushen da manufa S3 buckets da manyan fayiloli. A matsayin mafi kyau yi, da shigar da fitarwa guga Tsarin an tsara tare da hive style partitioning kamar yadda s3://<bucket>/products/YYYY/MM/DD/.
  • Samfurin dataset kira products.csv, wanda muke amfani da shi a cikin wannan sakon.

Loda aikin AWS Glue zuwa Amazon S3 a cikin Asusu B

A cikin Asusu B, ƙirƙirar wurin Amazon S3 da ake kira aws-glue-assets-<account-id>-<region>/scripts (idan babu). Sauya sigogi don ID na asusun da Yanki a cikin samfurin_manne_job.py rubutun da loda fayil ɗin aikin AWS Glue zuwa wurin Amazon S3.

Sanya albarkatu a cikin Asusu B ta amfani da AWS CloudFormation

A cikin Asusu na B, ƙaddamar da samfurin tari na CloudFormation don ƙirƙirar albarkatun masu zuwa:

  • Farashin S3 airflow-<username>-bucket don adana fayiloli masu alaƙa da Airflow tare da tsari mai zuwa:
    • daga - Babban fayil don fayilolin DAG.
    • plugins - Fayil ɗin don kowane al'ada ko al'ada plugins na Airflow.
    • bukatun - The requirements.txt fayil don kowane fakitin Python.
    • rubutun - Duk wani rubutun SQL da aka yi amfani da shi a cikin DAG.
    • data - Duk bayanan da aka yi amfani da su a cikin DAG.
  • Wurin Redshift maras Sabar. Sunan rukunin aiki da sarari suna an riga an yi shi da shi sample.
  • Yanayin AWS Glue, wanda ya ƙunshi abubuwa masu zuwa:
    • Manne AWS crawler, wanda ke jan bayanan daga guga tushen S3 sample-inp-bucket-etl-<username> in Account A.
    • Database mai suna products_db a cikin AWS Glue Data Catalog.
    • Farashin ELT aiki kira sample_glue_job. Wannan aikin na iya karanta fayiloli daga products tebur a cikin Data Catalog da loda bayanai a cikin Redshift tebur products.
  • Ƙofar VPC zuwa ƙarshen Amazon S3.
  • Yanayin MWAA na Amazon. Don cikakkun matakai don ƙirƙirar yanayi na Amazon MWAA ta amfani da na'urar wasan bidiyo na Amazon MWAA, koma zuwa Gabatar da Ayyukan Gudanar da Ayyukan Amazon don Apache Airflow (MWAA).

kaddamar da tari 1

Ƙirƙiri albarkatun Redshift Amazon

Ƙirƙirar tebur biyu da tsarin da aka adana akan ƙungiyar aiki mara amfani ta Redshift ta amfani da samfur.sql fayil.

A cikin wannan misali, muna ƙirƙirar tebur guda biyu da ake kira products da kuma products_f. Sunan hanyar da aka adana shine sp_products.

Saita izinin kwararar iska

Bayan an ƙirƙiri yanayin MWAA na Amazon cikin nasara, matsayin zai nuna kamar Ya Rasu. Zaɓi Bude Airflow UI don duba Airflow UI. DAGs ana daidaita su ta atomatik daga guga S3 kuma ana iya gani a cikin UI. Koyaya, a wannan matakin, babu DAGs a cikin babban fayil ɗin S3.

Ƙara manufofin sarrafa abokin ciniki AmazonMWAAFullConsoleAccess, wanda ke ba masu amfani da Airflow izinin shiga Gano AWS da Gudanar da Samun Dama (IAM) albarkatun, kuma haɗa wannan manufar zuwa matsayin Amazon MWAA. Don ƙarin bayani, duba Samun dama ga muhallin MWAA na Amazon.

Manufofin da ke haɗe da rawar MWAA na Amazon suna da cikakkiyar dama kuma dole ne a yi amfani da su kawai don dalilai na gwaji a cikin ingantaccen yanayin gwaji. Don ƙaddamar da samarwa, bi mafi ƙarancin gata.

Kafa muhalli

Wannan sashe yana zayyana matakai don daidaita yanayin. Tsarin ya ƙunshi matakai masu girma masu zuwa:

  1. Sabunta kowane masu samarwa da ake bukata.
  2. Saita hanyar shiga-asusu.
  3. Ƙaddamar da haɗin kai na VPC tsakanin Amazon MWAA VPC da Amazon Redshift VPC.
  4. Sanya Manajan Sirrin don haɗawa da Amazon MWAA.
  5. Ƙayyade hanyoyin haɗin iska.

Sabunta masu samarwa

Bi matakan da ke cikin wannan sashe idan sigar Amazon MWAA ɗinku ta ƙasa da 2.8.1 (sabon sigar kamar yadda ake rubuta wannan post ɗin).

Masu bayarwa fakiti ne waɗanda al'umma ke kula da su kuma sun haɗa da duk manyan masu aiki, ƙugiya, da na'urori masu auna firikwensin don sabis ɗin da aka bayar. Ana amfani da mai ba da sabis na Amazon don yin hulɗa tare da ayyukan AWS kamar Amazon S3, Amazon Redshift Serverless, AWS Glue, da ƙari. Akwai sama da kayayyaki 200 a cikin mai ba da sabis na Amazon.

Kodayake nau'in Airflow da ke tallafawa a cikin Amazon MWAA shine 2.6.3, wanda yazo tare da nau'in fakitin Amazon da aka bayar 8.2.0, ba a ƙara goyan bayan Amazon Redshift Serverless ba har sai Amazon ya samar da nau'in kunshin 8.4.0. Saboda tsohowar sigar mai ba da haɗin kai ta girme fiye da lokacin da aka gabatar da tallafin Redshift Serverless, dole ne a haɓaka sigar mai bayarwa don amfani da wannan aikin.

Mataki na farko shine sabunta fayil ɗin ƙuntatawa da requirements.txt fayil tare da daidaitattun sigogin. Koma zuwa Ƙayyadaddun sabbin fakitin masu bada sabis don matakai don sabunta kunshin mai bada Amazon.

  1. Ƙayyade buƙatun kamar haka:
    --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt"
    apache-airflow-providers-amazon==8.4.0

  2. Sabunta sigar a cikin fayil ɗin ƙuntatawa zuwa 8.4.0 ko sama.
  3. Ƙara ta ƙuntatawa-3.11-sabunta.txt fayil zuwa /dags fayil.

Duba zuwa Siffofin kwararar iska na Apache akan Gudun Ayyuka na Gudanar da Amazon don Apache Airflow don daidaitattun juzu'ai na fayil ɗin ƙuntatawa dangane da nau'in kwararar iska.

  1. Kewaya zuwa yanayin MWAA na Amazon kuma zaɓi Shirya.
  2. A karkashin DAG code a cikin Amazon S3, domin Fayil na buƙatu, zaɓi sabon sigar.
  3. zabi Ajiye.

Wannan zai sabunta yanayin kuma sabbin masu samarwa za su fara aiki.

  1. Don tabbatar da sigar masu samarwa, je zuwa Masu bayarwa ƙarƙashin Admin tebur.

Sigar fakitin mai ba da sabis na Amazon yakamata ya zama 8.4.0, kamar yadda aka nuna a hoto mai zuwa. Idan ba haka ba, an sami kuskure yayin lodawa requirements.txt. Don gyara kowane kurakurai, je zuwa CloudWatch console kuma buɗe requirements_install_ip shiga Shiga rafi, inda aka jera kurakurai. Koma zuwa Kunna rajistan ayyukan akan Amazon MWAA console don ƙarin bayani.

Saita hanyar shiga-asusu

Kuna buƙatar saita manufofin giciye da matsayi tsakanin Asusu A da Asusu B don samun dama ga buckets S3 don lodawa da sauke bayanai. Cika matakai masu zuwa:

  1. A cikin Account A, saita manufofin guga don guga sample-inp-bucket-etl-<username> don ba da izini ga AWS Glue da ayyukan Amazon MWAA a cikin Asusu B don abubuwa a cikin guga sample-inp-bucket-etl-<username>:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": [
                        "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>",
                        "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                    ]
                },
                "Action": [
                    "s3:GetObject",
    "s3:PutObject",
    		   "s3:PutObjectAcl",
    		   "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  2. Hakazalika, saita manufofin guga don guga sample-opt-bucket-etl-<username> don ba da izini ga ayyukan Amazon MWAA a cikin Asusu B don sanya abubuwa a cikin wannan guga:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                },
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  3. A cikin Account A, ƙirƙiri manufar IAM da ake kira policy_for_roleA, wanda ke ba da damar ayyukan Amazon S3 masu dacewa akan guga fitarwa:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:Encrypt",
                    "kms:GenerateDataKey"
                ],
                "Resource": [
                    "<KMS_KEY_ARN_Used_for_S3_encryption>"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetBucketAcl",
                    "s3:GetBucketCors",
                    "s3:GetEncryptionConfiguration",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListBucketVersions",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*"
                ]
            }
        ]
    }

  4. Ƙirƙiri sabuwar rawar IAM da ake kira RoleA tare da Asusu B a matsayin amintaccen aikin mahaluki kuma ƙara wannan manufar zuwa rawar. Wannan yana ba Account B damar ɗaukar RoleA don aiwatar da mahimman ayyukan Amazon S3 akan guga fitarwa.
  5. A cikin Asusu B, ƙirƙiri manufar IAM da ake kira s3-cross-account-access tare da izinin shiga abubuwa a cikin guga sample-inp-bucket-etl-<username>, wanda ke cikin Account A.
  6. Ƙara wannan manufar zuwa rawar AWS Glue da Amazon MWAA rawar:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*"
            }
        ]
    }

  7. A cikin Asusu B, ƙirƙiri manufofin IAM policy_for_roleB ƙayyade Account A a matsayin amintaccen mahaɗan. Mai zuwa shine manufar amana don ɗauka RoleA in Account A:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CrossAccountPolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA"
            }
        ]
    }

  8. Ƙirƙiri sabuwar rawar IAM da ake kira RoleB tare da Amazon Redshift azaman amintaccen nau'in mahaɗan kuma ƙara wannan manufar zuwa rawar. Wannan damar RoleB don ɗauka RoleA a cikin Account A da kuma Amazon Redshift za a iya ɗauka.
  9. Haɗa RoleB zuwa Redshift Serverless namespace, don haka Amazon Redshift na iya rubuta abubuwa zuwa guga fitarwa na S3 a cikin Account A.
  10. Haɗa manufofin policy_for_roleB zuwa rawar Amazon MWAA, wanda ke ba Amazon MWAA damar samun damar guga fitarwa a cikin Account A.

Duba zuwa Ta yaya zan ba da damar shiga asusun ga abubuwan da ke cikin Amazon S3 buckets? don ƙarin cikakkun bayanai kan saita damar shiga asusun ga abubuwa a cikin Amazon S3 daga AWS Glue da Amazon MWAA. Koma zuwa Ta yaya zan Kwafi ko Cire bayanai daga Amazon Redshift zuwa guga na Amazon S3 a wani asusu? don ƙarin cikakkun bayanai kan saita matsayin don sauke bayanai daga Amazon Redshift zuwa Amazon S3 daga Amazon MWAA.

Kafa VPC peering tsakanin Amazon MWAA da Amazon Redshift VPCs

Saboda Amazon MWAA da Amazon Redshift suna cikin VPC guda biyu daban-daban, kuna buƙatar saita VPC peering a tsakanin su. Dole ne ku ƙara hanya zuwa teburin hanyoyin da ke da alaƙa da gidajen yanar gizo don ayyukan biyu. Koma zuwa Yi aiki tare da haɗin kai na VPC don cikakkun bayanai akan peering VPC.

Tabbatar cewa an ba da izinin kewayon CIDR na Amazon MWAA VPC a cikin ƙungiyar tsaro ta Redshift kuma ana ba da izinin kewayon CIDR na Amazon Redshift VPC a cikin rukunin tsaro na MWAA na Amazon, kamar yadda aka nuna a hoton da ke gaba.

Idan an saita kowane matakan da suka gabata ba daidai ba, kuna iya fuskantar kuskuren "Lokacin Haɗin Kai" a cikin DAG gudu.

Sanya haɗin MWAA na Amazon tare da Manajan Sirrin

Lokacin da aka saita bututun Amazon MWAA don amfani da Manajan Sirrin, zai fara nemo haɗin kai da masu canji a cikin madadin baya (kamar Mai sarrafa Sirrin). Idan madadin baya ya ƙunshi ƙimar da ake buƙata, ana mayar da shi. In ba haka ba, zai bincika bayanan metadata don ƙimar kuma ya dawo da hakan maimakon haka. Don ƙarin bayani, koma zuwa Haɗa haɗin haɗin iska ta Apache ta amfani da sirrin Manajan Sirrin AWS.

Cika matakai masu zuwa:

  1. saita da Farashin VPC don haɗa Amazon MWAA da Manajan Sirrin (com.amazonaws.us-east-1.secretsmanager).

Wannan yana ba Amazon MWAA damar samun damar takaddun shaida da aka adana a cikin Manajan Sirrin.

  1. Don samar da Amazon MWAA tare da izini don samun damar shiga maɓallan sirri na Manajan Sirrin, ƙara manufar da ake kira SecretsManagerReadWrite zuwa matsayin IAM na muhalli.
  2. Don ƙirƙirar bayanan Manajan Sirrin azaman zaɓi na sanyi na Apache Airflow, je zuwa zaɓuɓɓukan daidaitawar iska, ƙara maɓalli-darajar maɓalli masu zuwa, sannan adana saitunanku.

Wannan yana saita Airflow don neman igiyoyin haɗin gwiwa da masu canji a wurin airflow/connections/* da kuma airflow/variables/* hanyoyi:

secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}

  1. Don samar da hanyar haɗin iska ta URI, je zuwa AWS CloudShell kuma ku shiga cikin harsashi Python.
  2. Gudun lambar mai zuwa don samar da haɗin haɗin URI:
    import urllib.parse
    conn_type = 'redshift'
    host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint
    port = '5439'
    login = 'admin' #Specify the username to use for authentication with Amazon Redshift
    password = '<password>' #Specify the password to use for authentication with Amazon Redshift
    role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>')
    database = 'dev'
    region = 'us-east-1' #YOUR_REGION
    conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&region={7}'.format(conn_type, login, password, host, port, role_arn, database, region)
    print(conn_string)
    

Ya kamata a samar da igiyar haɗin kai kamar haka:

redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=<region>

  1. Ƙara haɗin cikin Manajan Sirrin ta amfani da umarni mai zuwa a cikin Hanyar Layin Umarnin AWS (AWS CLI).

Hakanan za'a iya yin wannan daga na'ura mai sarrafa sirri. Za a ƙara wannan a cikin Mai sarrafa Sirri a matsayin bayyanannen rubutu.

aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=us-east-1" --region=us-east-1

Yi amfani da haɗin airflow/connections/secrets_redshift_connection in DAG. Lokacin da DAG ke aiki, zai nemi wannan haɗin kuma ya dawo da sirrin daga Manajan Sirrin. Idan akwai RedshiftDataOperator, wuce da secret_arn a matsayin siga maimakon sunan haɗi.

Hakanan zaka iya ƙara sirri ta amfani da na'urar wasan bidiyo Mai sarrafa Sirrin azaman maɓalli-darajar nau'i-nau'i.

  1. Ƙara wani sirri a cikin Manajan Sirrin kuma ajiye shi azaman airflow/connections/redshift_conn_test.

Ƙirƙirar haɗin iska ta hanyar bayanan metadata

Hakanan zaka iya ƙirƙirar haɗi a cikin UI. A wannan yanayin, za a adana bayanan haɗin kai a cikin bayanan metadata na Airflow. Idan ba a daidaita yanayin MWAA na Amazon don amfani da bayanan Manajan Sirrin ba, zai duba bayanan metadata don ƙimar kuma ya dawo da hakan. Kuna iya ƙirƙirar haɗin iska ta amfani da UI, AWS CLI, ko API. A cikin wannan sashe, mun nuna yadda ake ƙirƙirar haɗi ta amfani da Airflow UI.

  1. Ma Haɗin Id, shigar da suna don haɗin.
  2. Ma Nau'in Hanya, i Redshift na Amazon.
  3. Ma watsa shiri, shigar da ƙarshen Redshift (ba tare da tashar jiragen ruwa da bayanai ba) don Redshift Serverless.
  4. Ma database, shiga dev.
  5. Ma Mai amfani, shigar da sunan mai amfani na admin.
  6. Ma Kalmar siri, shigar da kalmar wucewa.
  7. Ma Port, Yi amfani da tashar jiragen ruwa 5439.
  8. Ma karin, saita region da kuma timeout sigogi.
  9. Gwada haɗin, sannan adana saitunan ku.

Ƙirƙiri kuma gudanar da DAG

A cikin wannan sashe, mun bayyana yadda ake ƙirƙirar DAG ta amfani da sassa daban-daban. Bayan kun ƙirƙiri da gudanar da DAG, zaku iya tabbatar da sakamakon ta hanyar tambayar tebur na Redshift da duba buckets S3 da aka yi niyya.

Ƙirƙiri DAG

A cikin Airflow, ana bayyana bututun bayanai a cikin lambar Python azaman DAGs. Mun ƙirƙiri DAG wanda ya ƙunshi masu aiki daban-daban, na'urori masu auna firikwensin, haɗi, ayyuka, da dokoki:

  • DAG yana farawa da neman fayilolin tushe a cikin guga S3 sample-inp-bucket-etl-<username> karkashin Account A don amfani da ranar yau S3KeySensor. Ana amfani da S3KeySensor don jira ɗaya ko maɓalli da yawa don kasancewa a cikin guga S3.
    • Misali, guga na S3 an raba shi azaman s3://bucket/products/YYYY/MM/DD/, don haka ya kamata firikwensin mu bincika manyan fayiloli tare da kwanan wata na yanzu. Mun samo kwanan wata na yanzu a cikin DAG kuma mun wuce wannan zuwa S3KeySensor, wanda ke neman kowane sabon fayiloli a cikin babban fayil na rana na yanzu.
    • Mun kuma saita wildcard_match as True, wanda ke ba da damar bincike bucket_key da za a fassara shi azaman tsarin kati na Unix. Saita mode to reschedule ta yadda aikin firikwensin ya 'yantar da ramin ma'aikaci lokacin da ba a cika ka'idodin ba kuma an sake tsara shi a wani lokaci. A matsayin mafi kyawun aiki, yi amfani da wannan yanayin lokacin poke_interval ya fi minti 1 don hana kaya mai yawa akan mai tsarawa.
  • Bayan akwai fayil ɗin a cikin guga S3, AWS Glue crawler yana aiki ta amfani da shi GlueCrawlerOperator don ja jiki guga tushen S3 sample-inp-bucket-etl-<username> ƙarƙashin Account A kuma yana sabunta metadata na tebur a ƙarƙashin products_db database a cikin Data Catalog. Mai rarrafe yana amfani da rawar AWS Glue da bayanan Katalogin Bayanai waɗanda aka ƙirƙira a cikin matakan da suka gabata.
  • DAG yana amfani da shi GlueCrawlerSensor don jira mai rarrafe ya kammala.
  • Lokacin da aikin crawler ya cika, GlueJobOperator Ana amfani dashi don gudanar da aikin AWS Glue. Sunan rubutun Glue AWS (tare da wuri) kuma an wuce zuwa ga mai aiki tare da rawar AWS Glue IAM. Sauran sigogi kamar GlueVersion, NumberofWorkers, Da kuma WorkerType ana wucewa ta amfani da create_job_kwargs siga.
  • DAG yana amfani da shi GlueJobSensor don jira aikin AWS Glue ya kammala. Lokacin da ya cika, Redshift staging table products za a ɗora da bayanai daga fayil ɗin S3.
  • Kuna iya haɗawa zuwa Amazon Redshift daga Airflow ta amfani da daban-daban uku aiki:
    • PythonOperator.
    • SQLExecuteQueryOperator, wanda ke amfani da haɗin PostgreSQL da redshift_default a matsayin tsoho dangane.
    • RedshiftDataOperator, wanda ke amfani da Redshift Data API da aws_default a matsayin tsoho dangane.

A cikin DAG, muna amfani SQLExecuteQueryOperator da kuma RedshiftDataOperator don nuna yadda ake amfani da waɗannan masu aiki. Ana gudanar da hanyoyin adana Redshift RedshiftDataOperator. DAG kuma yana gudanar da umarnin SQL a cikin Amazon Redshift don share bayanan daga tebur ta amfani da shi SQLExecuteQueryOperator.

Domin mun saita yanayin Amazon MWAA don neman haɗin kai a cikin Manajan Sirrin, lokacin da DAG ke gudana, yana dawo da bayanan haɗin Redshift kamar sunan mai amfani, kalmar sirri, mai masaukin baki, tashar jiragen ruwa, da Yanki daga Manajan Sirrin. Idan ba a sami haɗin a cikin Mai sarrafa Sirri ba, ana dawo da ƙimar daga tsoffin haɗin kai.

In SQLExecuteQueryOperator, Mun wuce sunan haɗin da muka ƙirƙira a cikin Manajan Sirrin. Yana neman airflow/connections/secrets_redshift_connection kuma yana karbo sirrin daga Manajan Sirrin. Idan ba a saita Manajan Sirrin ba, haɗin da aka ƙirƙira da hannu (misali, redshift-conn-id) ana iya wucewa.

In RedshiftDataOperator, mun wuce sirrin_arn na airflow/connections/redshift_conn_test haɗin da aka ƙirƙira a cikin Manajan Sirrin azaman siga.

  • A matsayin aikin ƙarshe, RedshiftToS3Operator Ana amfani da shi don sauke bayanai daga teburin Redshift zuwa guga S3 sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test daga Mai sarrafa sirri ana amfani dashi don sauke bayanan.
  • TriggerRule an saita zuwa ALL_DONE, wanda ke ba da damar mataki na gaba don gudana bayan duk ayyukan da aka gama.
  • An bayyana dogaron ayyuka ta amfani da chain() aiki, wanda ke ba da damar gudanar da ayyuka guda ɗaya idan an buƙata. A cikin yanayinmu, muna son duk ayyuka su gudana a jere.

Mai zuwa shine cikakken DAG code. The dag_id yakamata ya dace da sunan rubutun DAG, in ba haka ba ba za a daidaita shi cikin Airflow UI ba.

from datetime import datetime
from airflow import DAG 
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule


dag_id = "data_pipeline"
vYear = datetime.today().strftime("%Y")
vMonth = datetime.today().strftime("%m")
vDay = datetime.today().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="products"
#Please replace the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>"
glue_crawler_name = "products"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10

@task
def get_role_name(arn: str) -> str:
    return arn.split("/")[-1]

@task
def get_s3_loc(s3_folder: str) -> str:
    s3_loc  = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
    return s3_loc

with DAG(
    dag_id=dag_id,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    role_arn = glue_role_arn_key
    glue_role_name = get_role_name(role_arn)
    s3_loc = get_s3_loc(s3_folder)


    # Check for new incremental files in S3 source/input bucket
    sensor_key = S3KeySensor(
        task_id="sensor_key",
        bucket_key=s3_loc,
        bucket_name=src_bucket_name,
        wildcard_match=True,
        #timeout=18*60*60,
        #poke_interval=120,
        timeout=60,
        poke_interval=30,
        mode="reschedule"
    )

    # Run Glue crawler
    glue_crawler_config = {
        "Name": glue_crawler_name,
        "Role": role_arn,
        "DatabaseName": glue_db_name,
    }

    crawl_s3 = GlueCrawlerOperator(
        task_id="crawl_s3",
        config=glue_crawler_config,
    )

    # GlueCrawlerOperator waits by default, setting as False to test the Sensor below.
    crawl_s3.wait_for_completion = False

    # Wait for Glue crawler to complete
    wait_for_crawl = GlueCrawlerSensor(
        task_id="wait_for_crawl",
        crawler_name=glue_crawler_name,
    )

    # Run Glue Job
    submit_glue_job = GlueJobOperator(
        task_id="submit_glue_job",
        job_name=glue_job_name,
        script_location=glue_script_location,
        iam_role_name=glue_role_name,
        create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
    )

    # GlueJobOperator waits by default, setting as False to test the Sensor below.
    submit_glue_job.wait_for_completion = False

    # Wait for Glue Job to complete
    wait_for_job = GlueJobSensor(
        task_id="wait_for_job",
        job_name=glue_job_name,
        # Job ID extracted from previous Glue Job Operator task
        run_id=submit_glue_job.output,
        verbose=True,  # prints glue job logs in airflow logs
    )

    wait_for_job.poke_interval = 5

    # Execute the Stored Procedure in Redshift Serverless using Data Operator
    execute_redshift_stored_proc = RedshiftDataOperator(
        task_id="execute_redshift_stored_proc",
        database=db_name,
        workgroup_name=workgroup_name,
        secret_arn=secret_arn,
        sql="""CALL sp_products();""",
        poll_interval=poll_interval,
        wait_for_completion=True,
    )

    # Execute the Stored Procedure in Redshift Serverless using SQL Operator
    delete_from_table = SQLExecuteQueryOperator(
        task_id="delete_from_table",
        conn_id=redshift_conn_id_name,
        sql="DELETE FROM products;",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    # Unload the data from Redshift table to S3
    transfer_redshift_to_s3 = RedshiftToS3Operator(
        task_id="transfer_redshift_to_s3",
        s3_bucket=tgt_bucket_name,
        s3_key=s3_loc,
        schema="PUBLIC",
        table=redshift_table,
        redshift_conn_id=redshift_conn_id_name,
    )

    transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE

    #Chain the tasks to be executed
    chain(
        sensor_key,
        crawl_s3,
        wait_for_crawl,
        submit_glue_job,
        wait_for_job,
        execute_redshift_stored_proc,
        delete_from_table,
        transfer_redshift_to_s3
        )
    

Tabbatar da DAG gudu

Bayan ka ƙirƙiri fayil ɗin DAG (maye gurbin masu canji a cikin rubutun DAG) kuma loda shi zuwa ga s3://sample-airflow-instance/dags babban fayil, za a daidaita shi ta atomatik tare da Airflow UI. Duk DAGs suna bayyana akan DAGs tab. Juyawa da ON zaɓi don sa DAG ya iya aiki. Domin DAG din mu ya tashi schedule="@once", kuna buƙatar gudanar da aikin da hannu ta zaɓar gunkin gudu a ƙarƙashin Actions. Lokacin da DAG ya cika, ana sabunta matsayin a kore, kamar yadda aka nuna a hoton allo mai zuwa.

a cikin links sashe, akwai zaɓuɓɓuka don duba lambar, jadawali, grid, log, da ƙari. Zabi Shafi don ganin DAG a cikin tsarin hoto. Kamar yadda aka nuna a hoton allo na gaba, kowane launi na kumburi yana nuna takamaiman ma'aikaci, kuma launi na kullin yana nuna takamaiman matsayi.

Tabbatar da sakamakon

A kan Amazon Redshift console, kewaya zuwa Editan Tambaya v2 kuma zaɓi bayanan da ke cikin products_f tebur. Ya kamata a loda teburin kuma yana da adadin bayanai iri ɗaya kamar fayilolin S3.

A kan Amazon S3 console, kewaya zuwa guga S3 s3://sample-opt-bucket-etl in Account B. The product_f ya kamata a ƙirƙiri fayiloli a ƙarƙashin tsarin babban fayil s3://sample-opt-bucket-etl/products/YYYY/MM/DD/.

Tsaftacewa

Tsaftace albarkatun da aka ƙirƙira a matsayin ɓangare na wannan sakon don guje wa ci gaba da cajin da ke gudana:

  1. Share tarin CloudFormation da guga S3 waɗanda kuka ƙirƙira azaman abubuwan buƙatu.
  2. Share abubuwan haɗin kai na VPCs da VPC, manufofin lissafin giciye da matsayi, da sirrikan Manajan Sirrin.

Kammalawa

Tare da Amazon MWAA, za ku iya gina hadaddun ayyuka na aiki ta amfani da Airflow da Python ba tare da sarrafa gungu, nodes, ko duk wani abin da ake aiki da shi wanda ke da alaƙa da ƙaddamarwa da ƙaddamar da iska a cikin samarwa. A cikin wannan sakon, mun nuna yadda Amazon MWAA ke ba da hanya ta atomatik don shiga, canzawa, nazari, da rarraba bayanai tsakanin asusun daban-daban da ayyuka a cikin AWS. Don ƙarin misalan wasu ma'aikatan AWS, koma zuwa masu zuwa Bayanan ajiya na GitHub; muna ƙarfafa ku don ƙarin koyo ta hanyar gwada wasu daga cikin waɗannan misalan.


Game da Authors


Radhika Jakkula Babban Mai Haɓaka Magani ne na Ƙirƙirar Bayanai a AWS. Tana taimaka wa abokan ciniki gina samfura ta amfani da sabis na nazari na AWS da bayanan da aka gina manufa. Ita ƙwararriya ce wajen tantance buƙatu da yawa da kuma amfani da sabis na AWS masu dacewa, manyan kayan aikin bayanai, da tsare-tsare don ƙirƙirar gine-gine mai ƙarfi.

Sidhanth Muralidhar Babban Manajan Asusun Fasaha ne a AWS. Yana aiki tare da manyan abokan cinikin kasuwanci waɗanda ke gudanar da ayyukansu akan AWS. Yana da sha'awar yin aiki tare da abokan ciniki da kuma taimaka musu da kayan aikin gine-gine don farashi, aminci, aiki, da kyakkyawan aiki a ma'auni a cikin tafiyar girgije. Yana da sha'awar nazarin bayanai kuma.

tabs_img

Sabbin Hankali

tabs_img