Connect with us

Big Data

Essential Functionalities to Guide you While using AWS Glue and PySpark!

Published

on

Introduction

In this post, I have penned down AWS Glue and PySpark functionalities which can be helpful when thinking of creating AWS pipeline and writing AWS Glue PySpark scripts.

AWS Glue PySpark

AWS Glue is a fully managed extract, transform, and load (ETL) service to process large amounts of datasets from various sources for analytics and data processing.

While creating the AWS Glue job, you can select between Spark, Spark Streaming, and Python shell. These jobs can run a proposed script generated by AWS Glue, or an existing script that you provide or a new script authored by you. Also, you can select different monitoring options, job execution capacity, timeouts, delayed notification threshold, and non-overridable and overridable parameters.

AWS Glue PySpark

AWS Glue PySpark

Recently AWS recently launched Glue version 2.0 which features 10x faster Spark ETL job start times and reducing the billing duration from a 10-minute minimum to 1-minute minimum.

With AWS Glue you can create development endpoint and configure SageMaker or Zeppelin notebooks to develop and test your Glue ETL scripts.

AWS Glue PySpark - AWS Glue

I create a SageMaker notebook connected to the Dev endpoint to the author and test the ETL scripts. Depending on the language you are comfortable with, you can spin up the notebook.

AWS Glue PySpark -Jupyter Notebook

Now, let’s talk about some specific features and functionalities in AWS Glue and PySpark which can be helpful.

1. Spark DataFrames

Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database. You can create DataFrame from RDD, from file formats like csv, json, parquet.

With SageMaker Sparkmagic(PySpark) Kernel notebook, the Spark session is automatically created.

AWS Glue PySpark -Spark Dataframe

To create DataFrame –

# from CSV files S3_IN = "s3://mybucket/train/training.csv"csv_df = ( spark.read.format("org.apache.spark.csv") .option("header", True) .option("quote", '"') .option("escape", '"') .option("inferSchema", True) .option("ignoreLeadingWhiteSpace", True) .option("ignoreTrailingWhiteSpace", True) .csv(S3_IN, multiLine=False)
)# from PARQUET files S3_PARQUET="s3://mybucket/folder1/dt=2020-08-24-19-28/"df = spark.read.parquet(S3_PARQUET)# from JSON files
df = spark.read.json(S3_JSON)# from multiline JSON file df = spark.read.json(S3_JSON, multiLine=True)

2. GlueContext

GlueContext is the entry point for reading and writing DynamicFrames in AWS Glue. It wraps the Apache SparkSQL SQLContext object providing mechanisms for interacting with the Apache Spark platform.

from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrameglueContext = GlueContext(SparkContext.getOrCreate())

Glue Context

3. DynamicFrame

AWS Glue DynamicFrames are similar to SparkSQL DataFrames. It represents a distributed collection of data without requiring you to specify a schema. Also, it can be used to read and transform data that contains inconsistent values and types.

DynamicFrame can be created using the following options –

  • create_dynamic_frame_from_rdd — created from an Apache Spark Resilient Distributed Dataset (RDD)
  • create_dynamic_frame_from_catalog — created using a Glue catalog database and table name
  • create_dynamic_frame_from_options — created with the specified connection and format. Example — The connection type, such as Amazon S3, Amazon Redshift, and JDBC

DynamicFrames can be converted to and from DataFrames using .toDF() and fromDF(). Use the following syntax-

#create DynamicFame from S3 parquet files
datasource0 = glueContext.create_dynamic_frame_from_options( connection_type="s3", connection_options = { "paths": [S3_location] }, format="parquet", transformation_ctx="datasource0")#create DynamicFame from glue catalog datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "demo", table_name = "testtable", transformation_ctx = "datasource0")#convert to spark DataFrame df1 = datasource0.toDF()#convert to Glue DynamicFrame
df2 = DynamicFrame.fromDF(df1, glueContext , "df2")

You can read more about this here.

4. AWS Glue Job Bookmark

AWS Glue Job bookmark helps process incremental data when rerunning the job on a scheduled interval, preventing reprocessing of old data.

You can read more about this here. Also, you can read this.

5. Write out data

The DynamicFrame of the transformed dataset can be written out to S3 as non-partitioned (default) or partitioned. “partitionKeys” parameter can be specified in connection_option to write out the data to S3 as partitioned. AWS Glue organizes these datasets in Hive-style partition.

In the following code example, AWS Glue DynamicFrame is partitioned by year, month, day, hour, and written in parquet format in Hive-style partition on to S3.

s3://bucket_name/table_name/year=2020/month=7/day=13/hour=14/part-000–671c.c000.snappy.parquet

S3_location = "s3://bucket_name/table_name"datasink = glueContext.write_dynamic_frame_from_options(
frame= data,
connection_type="s3",
connection_options={ "path": S3_location, "partitionKeys": ["year", "month", "day", "hour"]
},
format="parquet",
transformation_ctx ="datasink")

You can read more about this here.

6. “glueparquet” format option

glueparquet is a performance-optimized Apache parquet writer type for writing DynamicFrames. It computes and modifies the schema dynamically.

datasink = glueContext.write_dynamic_frame_from_options( frame=dynamicframe, connection_type="s3", connection_options={ "path": S3_location, "partitionKeys": ["year", "month", "day", "hour"] }, format="glueparquet", format_options = {"compression": "snappy"}, transformation_ctx ="datasink")

You can read more about this here.

7. S3 Lister and other options for optimizing memory management

AWS Glue provides an optimized mechanism to list files on S3 while reading data into DynamicFrame which can be enabled using additional_options parameter “useS3ListImplementation” to true.

You can read more about this here.

8. Purge S3 path

purge_s3_path is a nice option available to delete files from a specified S3 path recursively based on retention period or other available filters. As an example, suppose you are running AWS Glue job to fully refresh the table per day writing the data to S3 with the naming convention of s3://bucket-name/table-name/dt=<data-time>. Based on the defined retention period using the Glue job itself you can delete the dt=<date-time> s3 folders. Another option is to set the S3 bucket lifecycle policy with the prefix.

#purge locations older than 3 days
print("Attempting to purge S3 path with retention set to 3 days.")
glueContext.purge_s3_path( s3_path=output_loc, options={"retentionPeriod": 72})

You have other options like purge_table, transition_table, and transition_s3_path also available. The transition_table option transitions the storage class of the files stored on Amazon S3 for the specified catalog’s database and table.

You can read more about this here.

9. Relationalize Class

Relationalize class can help flatten nested json outermost level.

You can read more about this here.

10. Unbox Class

The Unbox class helps the unbox string field in DynamicFrame to specified format type(optional).

You can read more about this here.

11. Unnest Class

The Unnest class flattens nested objects to top-level elements in a DynamicFrame.

root
|-- id: string
|-- type: string
|-- content: map
| |-- keyType: string
| |-- valueType: string

With content attribute/column being map Type, we can use the unnest class to unnest each key element.

unnested = UnnestFrame.apply(frame=data_dynamic_dframe)
unnested.printSchema()root
|-- id: string
|-- type: string
|-- content.dateLastUpdated: string
|-- content.creator: string
|-- content.dateCreated: string
|-- content.title: string

12. printSchema()

To print the Spark or Glue DynamicFrame schema in tree format use printSchema().

datasource0.printSchema()root
|-- ID: int
|-- Name: string
|-- Identity: string
|-- Alignment: string
|-- EyeColor: string
|-- HairColor: string
|-- Gender: string
|-- Status: string
|-- Appearances: int
|-- FirstAppearance: choice
| |-- int
| |-- long
| |-- string
|-- Year: int
|-- Universe: string

13. Fields Selection

select_fields can be used to select fields from Glue DynamicFrame.

# From DynamicFramedatasource0.select_fields(["Status","HairColor"]).toDF().distinct().show()

AWS Glue PySpark -Fields Selection

To select fields from Spark Dataframe to use “select” –

# From Dataframedatasource0_df.select(["Status","HairColor"]).distinct().show()

Image for post

14. Timestamp

For instance, the application writes data into DynamoDB and has a last_updated attribute/column. But, DynamoDB does not natively support date/timestamp data type. So, you could either store it as String or Number. In case stored as a number, it’s usually done as epoch time — the number of seconds since 00:00:00 UTC on 1 January 1970. You could see something like “1598331963” which is 2020–08–25T05:06:03+00:00 in ISO 8601.

You can read more about Timestamp here.

How can you convert it to a timestamp?

When you read the data using AWS Glue DynamicFrame and view the schema, it will show it as “long” data type.

root
|-- version: string
|-- item_id: string
|-- status: string
|-- event_type: string
|-- last_updated: long

To convert the last_updated long data type into timestamp data type, you can use the following code-

import pyspark.sql.functions as f
import pyspark.sql.types as tnew_df = ( df .withColumn("last_updated", f.from_unixtime(f.col("last_updated")/1000).cast(t.TimestampType()))
)

15. Temporary View from Spark DataFrame

In case you want to store the Spark DataFrame as a table and query it using spark SQL, you can convert the DataFrame into a temporary view that is available for only that spark session using createOrReplaceTempView.

df = spark.createDataFrame( [ (1, ['a', 'b', 'c'], 90.00), (2, ['x', 'y'], 99.99), ], ['id', 'event', 'score'] )df.printSchema()
root |-- id: long (nullable = true) |-- event: array (nullable = true) | |-- element: string (containsNull = true) |-- score: double (nullable = true)df.createOrReplaceTempView("example")spark.sql("select * from example").show()+---+---------+-----+
| id| event|score|
+---+---------+-----+
| 1|[a, b, c]| 90.0|
| 2| [x, y]|99.99|
+---+---------+-----+

16. Extract element from ArrayType

Suppose from the above example, you want to create a new attribute/column to store only the last event. How would you do it?

You use the element_at function. It returns an element of the array at the given index in extraction if col is an array. Also, it can be used to extract the given key in extraction if col is a map.

import pyspark.sql.functions as element_atnewdf = df.withColumn("last_event", element_at("event", -1))newdf.printSchema()
root |-- id: long (nullable = true) |-- event: array (nullable = true) | |-- element: string (containsNull = true) |-- score: double (nullable = true) |-- last_event: string (nullable = true)newdf.show()
+---+---------+-----+----------+
| id| event|score|last_event|
+---+---------+-----+----------+
| 1|[a, b, c]| 90.0| c|
| 2| [x, y]|99.99| y|
+---+---------+-----+----------+

17. explode

The explode function in PySpark is used to explode array or map columns in rows. For example, let’s try to explode “event” column from the above example-

from pyspark.sql.functions import explodedf1 = df.select(df.id,explode(df.event))df1.printSchema()
root |-- id: long (nullable = true) |-- col: string (nullable = true)df1.show()
+---+---+
| id|col|
+---+---+
| 1| a|
| 1| b|
| 1| c|
| 2| x|
| 2| y|
+---+---+

18. getField

In a Struct type, if you want to get a field by name, you can use “getField”. The following is its syntax-

import pyspark.sql.functions as f
from pyspark.sql import Rowfrom pyspark.sql import Row
df = spark.createDataFrame([Row(attributes=Row(Name='scott', Height=6.0, Hair='black')), Row(attributes=Row(Name='kevin', Height=6.1, Hair='brown'))]
)df.printSchema()
root |-- attributes: struct (nullable = true) | |-- Hair: string (nullable = true) | |-- Height: double (nullable = true) | |-- Name: string (nullable = true)df.show()
+-------------------+
| attributes|
+-------------------+
|[black, 6.0, scott]|
|[brown, 6.1, kevin]|
+-------------------+df1 = (df .withColumn("name", f.col("attributes").getField("Name")) .withColumn("height", f.col("attributes").getField("Height")) .drop("attributes") )df1.show()
+-----+------+
| name|height|
+-----+------+
|scott| 6.0|
|kevin| 5.1|
+-----+------+

19. startswith

In case, you want to find records based on a string match you can use “startswith”.

In the following example I am searching for all records where value for description column starts with “[{“.

import pyspark.sql.functions as fdf.filter(f.col("description").startswith("[{")).show()

20. Extract year, month, day, hour

One of the common use cases is to write the AWS Glue DynamicFrame or Spark DataFrame to S3 in Hive-style partition. To do so you can extract the year, month, day, hour, and use it as partitionkeys to write the DynamicFrame/DataFrame to S3.

import pyspark.sql.functions as fdf2 = (raw_df .withColumn('year', f.year(f.col('last_updated'))) .withColumn('month', f.month(f.col('last_updated'))) .withColumn('day', f.dayofmonth(f.col('last_updated'))) .withColumn('hour', f.hour(f.col('last_updated'))) )

About the Author

Author

Anand Prakash – 5x AWS Certified | 5x Oracle Certified

Avid learner of technology solutions around databases, big-data, machine learning.
Connect on Twitter @anandp86

You can also read this article on our Mobile APP Get it on Google Play

Related Articles

Source: https://www.analyticsvidhya.com/blog/2020/08/essential-functionalities-to-guide-you-while-using-aws-glue-and-pyspark/

Big Data

China defends clampdown on tech firms in a meeting with Wall St execs – Bloomberg News

Published

on

(Reuters) – China’s top securities regulator defended their crackdown on various industries in a private meeting with Wall Street executives, Bloomberg News reported on Saturday.

Investors’ concerns over the regulatory crackdown has led to sharp sell-offs on China’s share markets, reducing the market capitalisation of some of its largest companies including Alibaba Group Holding Limited.

China Securities Regulatory Commission (CSRC) Vice Chairman Fang Xinghai explained during the meeting that recent actions were taken to strengthen regulations for companies with consumer-facing platforms, and improve data privacy and national security, the report https://bloom.bg/39iLhKH said, citing people familiar with the matter.

The three-hour meeting of the China-U.S. Financial Roundtable on Thursday included the head of the People’s Bank of China, and executives from Goldman Sachs Group Inc , Citadel and other Wall Street powerhouses, Bloomberg reported.

The CSRC could not be immediately reached for a comment.

Goldman Sachs declined to comment while Citadel did not immediately respond to a request for comment.

Global investors have been spooked in recent months by a flurry of Chinese regulations targeting sectors ranging from technology, gaming and private tutoring.

Fang said the regulator’s actions in the education and gaming sectors were aimed at reducing anxiety in society, according to the report.

(Reporting by Aishwarya Nair in Bengaluru; Editing by Simon Cameron-Moore)

Image Credit: Reuters

PlatoAi. Web3 Reimagined. Data Intelligence Amplified.
Click here to access.

Source: https://datafloq.com/read/china-defends-clampdown-tech-firms-meeting-wall-st-execs-bloomberg-news/17969

Continue Reading

Big Data

China defends clampdown on tech firms in a meeting with Wall St execs – Bloomberg News

Published

on

(Reuters) – China’s top securities regulator defended their crackdown on various industries in a private meeting with Wall Street executives, Bloomberg News reported on Saturday.

Investors’ concerns over the regulatory crackdown has led to sharp sell-offs on China’s share markets, reducing the market capitalisation of some of its largest companies including Alibaba Group Holding Limited.

China Securities Regulatory Commission (CSRC) Vice Chairman Fang Xinghai explained during the meeting that recent actions were taken to strengthen regulations for companies with consumer-facing platforms, and improve data privacy and national security, the report https://bloom.bg/39iLhKH said, citing people familiar with the matter.

The three-hour meeting of the China-U.S. Financial Roundtable on Thursday included the head of the People’s Bank of China, and executives from Goldman Sachs Group Inc , Citadel and other Wall Street powerhouses, Bloomberg reported.

The CSRC could not be immediately reached for a comment.

Goldman Sachs declined to comment while Citadel did not immediately respond to a request for comment.

Global investors have been spooked in recent months by a flurry of Chinese regulations targeting sectors ranging from technology, gaming and private tutoring.

Fang said the regulator’s actions in the education and gaming sectors were aimed at reducing anxiety in society, according to the report.

(Reporting by Aishwarya Nair in Bengaluru; Editing by Simon Cameron-Moore)

Image Credit: Reuters

PlatoAi. Web3 Reimagined. Data Intelligence Amplified.
Click here to access.

Source: https://datafloq.com/read/china-defends-clampdown-tech-firms-meeting-wall-st-execs-bloomberg-news/17969

Continue Reading

Big Data

Facebook says WSJ allegations are ‘mischaracterizations,’ confer ‘false motives’

Published

on

(Reuters) – Facebook Inc on Saturday slammed a Wall Street Journal series of articles about the social media company’s platform as containing “deliberate mischaracterizations” and said the articles “conferred egregiously false motives to Facebook’s leadership and employees.”

The Wall Street Journal, citing a review of internal company documents that included research reports, online employee discussions and drafts of presentations to senior management, said that although Facebook researchers have identified “the platform’s ill effects,” the company failed to fix them.

The Wall Street Journal articles say that Facebook exempted high-profile users from some or all of its rules, played down the negative effects on young users of its Instagram app, made changes to its algorithm that made the platform “angrier,” and had a weak response to alarms raised by employees over how the platform is used in developing countries by human traffickers.

Nick Clegg, Facebook’s vice president of global affairs, writing in a blog post https://about.fb.com/news/2021/09/what-the-wall-street-journal-got-wrong, said the Wall Street Journal’s stories “contained deliberate mischaracterizations of what we are trying to do, and conferred egregiously false motives to Facebook’s leadership and employees.”

Clegg called “just plain false” an allegation that “Facebook conducts research and then systematically and willfully ignores it if the findings are inconvenient for the company.”

Facebook, Clegg said, understands the “significant responsibility that comes with operating a global platform” and takes it seriously, but “we fundamentally reject this mischaracterization of our work and impugning of the company’s motives.”

Clegg defended Facebook’s handling of posts on the COVID-19 vaccine and said that the “intersection between social media and well-being” remains an evolving issue in the research community.

(Reporting by Juby Babu in Bengaluru; Editing by Leslie Adler)

Image Credit: Reuters

PlatoAi. Web3 Reimagined. Data Intelligence Amplified.
Click here to access.

Source: https://datafloq.com/read/facebook-says-wsj-allegations-mischaracterizations-confer-false-motives/17968

Continue Reading

Big Data

Facebook says WSJ allegations are ‘mischaracterizations,’ confer ‘false motives’

Published

on

(Reuters) – Facebook Inc on Saturday slammed a Wall Street Journal series of articles about the social media company’s platform as containing “deliberate mischaracterizations” and said the articles “conferred egregiously false motives to Facebook’s leadership and employees.”

The Wall Street Journal, citing a review of internal company documents that included research reports, online employee discussions and drafts of presentations to senior management, said that although Facebook researchers have identified “the platform’s ill effects,” the company failed to fix them.

The Wall Street Journal articles say that Facebook exempted high-profile users from some or all of its rules, played down the negative effects on young users of its Instagram app, made changes to its algorithm that made the platform “angrier,” and had a weak response to alarms raised by employees over how the platform is used in developing countries by human traffickers.

Nick Clegg, Facebook’s vice president of global affairs, writing in a blog post https://about.fb.com/news/2021/09/what-the-wall-street-journal-got-wrong, said the Wall Street Journal’s stories “contained deliberate mischaracterizations of what we are trying to do, and conferred egregiously false motives to Facebook’s leadership and employees.”

Clegg called “just plain false” an allegation that “Facebook conducts research and then systematically and willfully ignores it if the findings are inconvenient for the company.”

Facebook, Clegg said, understands the “significant responsibility that comes with operating a global platform” and takes it seriously, but “we fundamentally reject this mischaracterization of our work and impugning of the company’s motives.”

Clegg defended Facebook’s handling of posts on the COVID-19 vaccine and said that the “intersection between social media and well-being” remains an evolving issue in the research community.

(Reporting by Juby Babu in Bengaluru; Editing by Leslie Adler)

Image Credit: Reuters

PlatoAi. Web3 Reimagined. Data Intelligence Amplified.
Click here to access.

Source: https://datafloq.com/read/facebook-says-wsj-allegations-mischaracterizations-confer-false-motives/17968

Continue Reading
Crowdfunding5 days ago

Conister Bank Lends More to Time Finance

Esports5 days ago

NBA 2K22 Lightning Green Animation: How to Claim

Esports4 days ago

Rocket League Championship Series (RLCS) is expanding to more regions, features new format and $6 million prize pool for 2021-22 season

Esports3 days ago

NBA 2K22 The Game Quest Explained

Esports5 days ago

NBA 2K22 MyCareer Takeover Perks: Which to Use

Esports2 days ago

Shiny Zacian and Zamazenta promotion announced for Pokémon Brilliant Diamond, Shining Pearl preorders in South Korea

Esports4 days ago

The VCS hasn’t given up on Worlds 2021, continues to work with Riot to find a solution to travel restrictions, reports say

Esports3 days ago

Riot to launch new client that houses all of the company’s desktop titles in one hub, rollout begins next week

Esports5 days ago

Best Shooting Badges in NBA 2K22: Which to Use

Esports4 days ago

New 2021 Tin of Ancient Battles content confirms Yu-Gi-Oh! TCG is finally getting Crossout Designator, lots of good reprints

Esports2 days ago

How to download Deltarune Chapter 2

Esports4 days ago

New 2021 Tin of Ancient Battles content confirms Yu-Gi-Oh! TCG is finally getting Crossout Designator, lots of good reprints

Cyber Security3 days ago

How To Choose The Right Sales Training Software For Your Team?

Energy3 days ago

Technipaq Partners with DuPont™ Tyvek® and Freepoint Eco-Systems to Reduce and Recycle Medical Packaging Plastic Waste

Energy3 days ago

Kontrol Technologies to Participate in Virtual Energy Conference on September 21st, 2021

Esports5 days ago

Riot lowers Ryze’s Q AP ratio, nerfs Soraka’s ultimate healing in detailed preview for League’s Patch 11.19

Esports4 days ago

Rocket League Championship Series (RLCS) is expanding to more regions, features new format and $6 million prize pool for 2021-22 season

Esports4 days ago

Rocket League Championship Series (RLCS) is expanding to more regions, features new format and $6 million prize pool for 2021-22 season

Energy3 days ago

Momentum Manufacturing Group’s Strength Earns Third Straight Vermont Business Growth Award

Energy3 days ago

Teamsters Denounce Billionaire John Catsimatidis For Firing Striking Workers

Trending

Copyright © 2020 Plato Technologies Inc.