PySpark, it is a Python library for Apache Spark, an open-source distributed computing system. PySpark allows you to write Spark applications using Python programming language, providing a Python API for Spark’s capabilities.
Below are some properties of pyspark.
- Distributed Computing: Apache Spark is designed for distributed data processing, and PySpark enables Python developers to harness the power of Spark for large-scale data processing and analytics.
- API for Python: PySpark provides a Python API that allows developers to interact with Spark using Python code. This includes accessing Spark’s core functionality, libraries, and distributed data structures.
- Ease of Use: For Python developers, PySpark offers a familiar syntax and programming model. It allows them to leverage the capabilities of Spark without having to learn a new language.
- Integration with Python Ecosystem: PySpark integrates well with the Python ecosystem. You can use popular Python libraries and tools alongside PySpark for tasks like data analysis, machine learning, and visualization.
- DataFrames and Spark SQL: PySpark introduces the concept of DataFrames, which provides a higher-level abstraction for working with structured data. It also supports Spark SQL, allowing developers to run SQL queries on distributed datasets.
1. Create a simple Spark Dataframe
from pyspark.context import SparkContext from awsglue.context import GlueContext from pyspark.sql import SparkSession # Create a SparkContext sc = SparkContext() # Create a GlueContext glueContext = GlueContext(sc) # Create a SparkSession spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate() from pyspark.sql import SparkSession # Create a Spark session spark = SparkSession.builder.appName("example").getOrCreate() # Sample data data = [("Alice", 25), ("Bob", 30), ("Charlie", 22)] # Define schema schema = ["Name", "Age"] # Create a DataFrame df = spark.createDataFrame(data, schema=schema) # Show the DataFrame df.show()
2. Read data from a table using spark.sql
To read data from a table using Spark SQL, you can leverage the SparkSession’s SQL interface. For instance, spark.sql("SELECT * FROM your_table")
retrieves the entire content of the specified table, providing a convenient way to interact with structured data stored in a distributed environment.
df = spark.sql(""" select distinct * from databasename.your_table""")
Now we can create a new column by concatenating 2 columns.
df = df.withColumn("Key",concat(df.column1, df.column1))
Apply a filter to the dataframe
df.filter(col("col1")==49067017).show()
You can store this filtered result into a dataframe by writing the below code.
df_new=df.filter(col("gender")=='Male').show()
Format date into date format
dfnew = dfnew.withColumn('CCDATE1', to_date(dfnew.CCDATE, 'yyyyMMdd'))
Join between two Spark Dataframe
dfnew = dfnew.join(df, on="ID", how='left')
Join two spark dataframes on multiple columns
joined_table = dfnew.join( df_glhi, (dfnew["ID"] == df["ID"]) & (dfnew["CCDATE1"] >= df["EFFDATE2"]) & (dfnew["prodtyp"] == df["ptyp"]) & (dfnew["startdate"] <= df["DTETRM2"]), "left" )
Create Rank by sorting the dataframe based on date for every ID
data = [ Row(ID=1, CCDATE1="2022-01-15", Value=10), Row(ID=1, CCDATE1="2022-02-20", Value=15), Row(ID=2, CCDATE1="2021-12-10", Value=25), Row(ID=2, CCDATE1="2022-03-05", Value=18), Row(ID=3, CCDATE1="2022-01-05", Value=30), ]
# Define the window specification window_spec = Window.partitionBy("ID").orderBy(col("CCDATE1").desc()) # Add a rank column based on CCDATE in decreasing order df_ranked = df.withColumn("rank", rank().over(window_spec)) # Show the result df_ranked.show(truncate=False)
Drop Columns from a spark dataframe
colstodrop=["Key","pkgnum01","subpkgid","StartDate"] dfnew3=dfnew3.drop(*colstodrop) dfnew3.printSchema()
Using above printSchema() function you can print columns and their datatypes
Add a new date column with some defined date
cutoff_end_date = lit('2023-03-31').cast(DateType()) dfnew = dfnew3.withColumn('CutoffEndDate', cutoff_end_date)
Create a new column based on a condition in spark dataframe
df = df.withColumn( 'DTETRM2', when(col('DTETRM') == '99999999', '20401231').otherwise(col('DTETRM')) )
Drop Duplicates from a spark dataframe and store it in new dataframe
joined_table1=joined_table1.dropDuplicates()
Drop Null Values from spark dataframe
joined_table1=joined_table1.dropna(subset=["CLNTNUM"])
Create a condition and apply it to the spark dataframe
conditions = [ (col("No of Days") / 365 < 1.25, 1), ((col("No of Days") / 365 >= 1.25) & (col("No of Days") / 365 < 2.25), 2), (col("No of Days") / 365 >= 2.25, 3) ] # Use the 'when' function to apply the conditions and create the 'tenure' column joined_table1 = joined_table1.withColumn("Tenure", when(conditions[0][0], conditions[0][1]).when(conditions[1][0], conditions[1][1]).when(conditions[2][0], conditions[2][1]).otherwise(None))
3. Explode data in spark Dataframe
The explode
function in PySpark is used to transform a column containing arrays or maps into multiple rows, duplicating the values of other columns for each element in the array or map. In the provided example, RES_GMHD_CHDR_CLNT
DataFrame is augmented with a new column ‘repeat’, where each row is duplicated based on the value in the ‘Tenure’ column. This is achieved by exploding a sequence of integers from 1 to ‘Tenure’. The explode
function is essential for unnesting nested structures, enabling efficient analysis of array or map data within a DataFrame.
RES_GMHD_CHDR_CLNT = RES_GMHD_CHDR_CLNT.withColumn('repeat', explode(expr("sequence(1, Tenure)")))
UDF Functions in PySpark
A User-Defined Function (UDF) in PySpark is a custom function written by a user to perform specific operations on DataFrame columns. It extends PySpark’s capabilities by allowing users to apply their own logic in a distributed computing environment. To apply a UDF to a Spark DataFrame, define the function in Python, register it as a UDF using udf()
from pyspark.sql.functions
, and then use withColumn
to create a new column applying the UDF. This enables users to leverage the flexibility of Python functions while harnessing the scalability of Spark for data processing tasks.
def add_days(repeat, ccd): if repeat == 2: return ccd + timedelta(days=365) else: return ccd add_days_udf = udf(add_days, DateType()) df = df.withColumn('New_Start_Date', add_days_udf(df['repeat'], df['CCDATE1']))
Union of Spark Dataframes
The Union operation in Spark DataFrame combines two or more DataFrames vertically, stacking their rows. It requires DataFrames to have the same schema. The resulting DataFrame contains all rows from the input DataFrames, eliminating duplicates. It’s analogous to the SQL UNION operation, providing a concise way to merge data.
result = df1.union(df2).union(df3)
Least Function in PySpark
Final_Data = Final_Data.withColumn('FY_Act_End1', least(col('FY_Act_End'), col('CutoffEndDate')))
Above PySpark code updates the ‘FY_Act_End1’ column in DataFrame ‘Final_Data’ with the minimum value between the ‘FY_Act_End’ and ‘CutoffEndDate’ columns. It ensures that ‘FY_Act_End1’ contains the earlier date between the two, facilitating time-related analyses with the adjusted fiscal year end date.
Write the spark dataframe into a CSV file
df.coalesce(1).write.mode("overwrite").option("header","true").csv("file.csv")
Remember that above coalesce(1), here 1 means that it will overwrite the files in the existing folder.
So above are the some Operations that we usually perform in PySpark. Please let us know your reviews on the same.