AWS Tutorials Data Engineering Python

Pyspark Tutorial for beginners

PySpark, it is a Python library for Apache Spark, an open-source distributed computing system. PySpark allows you to write Spark applications using Python programming language, providing a Python API for Spark’s capabilities.

Below are some properties of pyspark.

  1. Distributed Computing: Apache Spark is designed for distributed data processing, and PySpark enables Python developers to harness the power of Spark for large-scale data processing and analytics.
  2. API for Python: PySpark provides a Python API that allows developers to interact with Spark using Python code. This includes accessing Spark’s core functionality, libraries, and distributed data structures.
  3. Ease of Use: For Python developers, PySpark offers a familiar syntax and programming model. It allows them to leverage the capabilities of Spark without having to learn a new language.
  4. Integration with Python Ecosystem: PySpark integrates well with the Python ecosystem. You can use popular Python libraries and tools alongside PySpark for tasks like data analysis, machine learning, and visualization.
  5. DataFrames and Spark SQL: PySpark introduces the concept of DataFrames, which provides a higher-level abstraction for working with structured data. It also supports Spark SQL, allowing developers to run SQL queries on distributed datasets.

1. Create a simple Spark Dataframe

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession

# Create a SparkContext
sc = SparkContext()

# Create a GlueContext
glueContext = GlueContext(sc)

# Create a SparkSession
spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate()

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Sample data
data = [("Alice", 25), ("Bob", 30), ("Charlie", 22)]

# Define schema
schema = ["Name", "Age"]

# Create a DataFrame
df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
df.show()

2. Read data from a table using spark.sql

To read data from a table using Spark SQL, you can leverage the SparkSession’s SQL interface. For instance, spark.sql("SELECT * FROM your_table") retrieves the entire content of the specified table, providing a convenient way to interact with structured data stored in a distributed environment.

df = spark.sql(""" select distinct * from databasename.your_table""")

Now we can create a new column by concatenating 2 columns.

df = df.withColumn("Key",concat(df.column1, df.column1))

Apply a filter to the dataframe

df.filter(col("col1")==49067017).show()

You can store this filtered result into a dataframe by writing the below code.

df_new=df.filter(col("gender")=='Male').show()

Format date into date format

dfnew = dfnew.withColumn('CCDATE1', to_date(dfnew.CCDATE, 'yyyyMMdd'))

Join between two Spark Dataframe

dfnew = dfnew.join(df, on="ID", how='left')

Join two spark dataframes on multiple columns

joined_table = dfnew.join(
    df_glhi,
    (dfnew["ID"] == df["ID"]) &
    (dfnew["CCDATE1"] >= df["EFFDATE2"]) &
    (dfnew["prodtyp"] == df["ptyp"]) &
    (dfnew["startdate"] <= df["DTETRM2"]),
    "left"
)

Create Rank by sorting the dataframe based on date for every ID

data = [
    Row(ID=1, CCDATE1="2022-01-15", Value=10),
    Row(ID=1, CCDATE1="2022-02-20", Value=15),
    Row(ID=2, CCDATE1="2021-12-10", Value=25),
    Row(ID=2, CCDATE1="2022-03-05", Value=18),
    Row(ID=3, CCDATE1="2022-01-05", Value=30),
]
# Define the window specification
window_spec = Window.partitionBy("ID").orderBy(col("CCDATE1").desc())

# Add a rank column based on CCDATE in decreasing order
df_ranked = df.withColumn("rank", rank().over(window_spec))

# Show the result
df_ranked.show(truncate=False)

Drop Columns from a spark dataframe

colstodrop=["Key","pkgnum01","subpkgid","StartDate"]
dfnew3=dfnew3.drop(*colstodrop)
dfnew3.printSchema()

Using above printSchema() function you can print columns and their datatypes

Add a new date column with some defined date

cutoff_end_date = lit('2023-03-31').cast(DateType())
dfnew = dfnew3.withColumn('CutoffEndDate', cutoff_end_date)

Create a new column based on a condition in spark dataframe

df = df.withColumn(
    'DTETRM2',
    when(col('DTETRM') == '99999999', '20401231').otherwise(col('DTETRM'))
)

Drop Duplicates from a spark dataframe and store it in new dataframe

joined_table1=joined_table1.dropDuplicates()

Drop Null Values from spark dataframe

joined_table1=joined_table1.dropna(subset=["CLNTNUM"])

Create a condition and apply it to the spark dataframe

conditions = [
    (col("No of Days") / 365 < 1.25, 1),
    ((col("No of Days") / 365 >= 1.25) & (col("No of Days") / 365 < 2.25), 2),
    (col("No of Days") / 365 >= 2.25, 3)
]

# Use the 'when' function to apply the conditions and create the 'tenure' column
joined_table1 = joined_table1.withColumn("Tenure", when(conditions[0][0], conditions[0][1]).when(conditions[1][0], conditions[1][1]).when(conditions[2][0], conditions[2][1]).otherwise(None))

3. Explode data in spark Dataframe

The explode function in PySpark is used to transform a column containing arrays or maps into multiple rows, duplicating the values of other columns for each element in the array or map. In the provided example, RES_GMHD_CHDR_CLNT DataFrame is augmented with a new column ‘repeat’, where each row is duplicated based on the value in the ‘Tenure’ column. This is achieved by exploding a sequence of integers from 1 to ‘Tenure’. The explode function is essential for unnesting nested structures, enabling efficient analysis of array or map data within a DataFrame.

RES_GMHD_CHDR_CLNT = RES_GMHD_CHDR_CLNT.withColumn('repeat', explode(expr("sequence(1, Tenure)")))

UDF Functions in PySpark

A User-Defined Function (UDF) in PySpark is a custom function written by a user to perform specific operations on DataFrame columns. It extends PySpark’s capabilities by allowing users to apply their own logic in a distributed computing environment. To apply a UDF to a Spark DataFrame, define the function in Python, register it as a UDF using udf() from pyspark.sql.functions, and then use withColumn to create a new column applying the UDF. This enables users to leverage the flexibility of Python functions while harnessing the scalability of Spark for data processing tasks.

def add_days(repeat, ccd):
    if repeat == 2:
        return ccd + timedelta(days=365)
    else:
        return ccd
add_days_udf = udf(add_days, DateType())
df = df.withColumn('New_Start_Date', add_days_udf(df['repeat'], df['CCDATE1']))

Union of Spark Dataframes

The Union operation in Spark DataFrame combines two or more DataFrames vertically, stacking their rows. It requires DataFrames to have the same schema. The resulting DataFrame contains all rows from the input DataFrames, eliminating duplicates. It’s analogous to the SQL UNION operation, providing a concise way to merge data.

result = df1.union(df2).union(df3)

Least Function in PySpark

Final_Data = Final_Data.withColumn('FY_Act_End1', least(col('FY_Act_End'), col('CutoffEndDate')))

Above PySpark code updates the ‘FY_Act_End1’ column in DataFrame ‘Final_Data’ with the minimum value between the ‘FY_Act_End’ and ‘CutoffEndDate’ columns. It ensures that ‘FY_Act_End1’ contains the earlier date between the two, facilitating time-related analyses with the adjusted fiscal year end date.

Write the spark dataframe into a CSV file

df.coalesce(1).write.mode("overwrite").option("header","true").csv("file.csv")

Remember that above coalesce(1), here 1 means that it will overwrite the files in the existing folder.

So above are the some Operations that we usually perform in PySpark. Please let us know your reviews on the same.

Leave a Reply

Discover more from Geeky Codes

Subscribe now to keep reading and get access to the full archive.

Continue reading

Discover more from Geeky Codes

Subscribe now to keep reading and get access to the full archive.

Continue reading