Q1: Write pyspark code to create dataframe and print with ‘color’ and ‘weight’ as separate columns.
inventoryData = [
('Laptop', 20, {'color': 'silver', 'weight': 2.5}),
('Phone', 50, {'color': 'black', 'weight': 0.5}),
('Tablet', 30, {'color': 'white', 'weight': 0.8}),
('Chair', 10, {'color': 'brown', 'weight': 3.0}),
('Printer', 15, {'color': 'gray', 'weight': 7.5}),
]
schema =["Product", "Quantity", "Properties"]
| Product | Quantity | color | weight |
| Laptop | 20 | silver | 2.5 |
| Phone | 50 | black | 0.5 |
| Tablet | 30 | white | 0.8 |
| Chair | 10 | brown | 3.0 |
| Printer | 15 | gray | 7.5 |
Solution
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df=spark.createDataFrame(inventoryData,schema)
Result_df=df.withColumn("color",df.Properties["color"]).withColumn("weight",df.Properties["weight"]).drop("Properties")

Q2: Write the query in Pyspark as well in SQL to get the required output

Pyspark Implementation
from pyspark.sql.functions import *
result_df= df.groupBy("city").agg(count("gender")\
.alias("total_count"),count(when(df.gender=="M",1))\
.alias("Male_count"),count(when(df.gender=="F",1))\
.alias("Female_count")).orderBy(desc("total_count"))
SQL Implementation
select city,count(gender) as Total_count , count(case when gender='M' then 1 end ) as Male_count, count(case when gender='F' then 1 end ) as Female_count from emp group by city order by count(gender) desc
Q3: Write an SQL query to find the percentage of immediate orders in the table.Also find the same in pyspark.

PySpark Implementation
prcentage_of_immediate_deliveries=(df.filter(df.order_date==df.customer_pref_delivery_date).count()/df.count() )*100 print(prcentage_of_immediate_deliveries)
SQL Implementation
select count(case when order_date=customer_pref_delivery_date then 1 end )/cast(count(*) as float)*100 as percentage_of_immediate_deliveries from Delivery
Q4
Input Data

Output Data

PySpark Implementation
sampleData=[(1,"A",20,"31|32|34"),
(2,"B",21,"21|32|43"),
(3,"C",22,"21|32|11"),
(4,"D",23,"10|12|12")]
schema=["id","name","age","marks"]
df=spark.createDataFrame(sampleData,schema)
from pyspark.sql.functions import *
df=df.withColumn("physics",split(df.marks,"\\|")[0]).\
withColumn("chemistry",split(df.marks,"\\|")[1]).\
withColumn("maths",split(df.marks,"\\|")[2]).\
drop("marks")
SQL Implementation
SELECT
id,
name,
age,
CAST(SUBSTRING(marks, 1, CHARINDEX('|', marks) - 1) AS INT) AS physics,
CAST(SUBSTRING(marks, CHARINDEX('|', marks) + 1, CHARINDEX('|', marks, CHARINDEX('|', marks) + 1) - CHARINDEX('|', marks) - 1) AS INT) AS chemistry,
CAST(SUBSTRING(marks, CHARINDEX('|', marks, CHARINDEX('|', marks) + 1) + 1, LEN(marks) - CHARINDEX('|', marks, CHARINDEX('|', marks) + 1)) AS INT) AS maths
FROM Students;
Q5: How would you calculate the month-wise cumulative revenue using PySpark?

OUTPUT DATAFRAME :
NOTE :
1. MONTH NAME ARE IN NAMES NOT MONTH NUMBERS

PySpark Implementation
from pyspark.sql.functions import *
from pyspark.sql.window import *
df2 = df.withColumn("month", to_date(df.date,"dd-MMMM"))
df2 = df2.withColumn("month", date_format(df2.month, "MMMM"))
window_specification=Window.partitionBy("month").orderBy("revenue")
result_df=df2.withColumn("cummulativeSum",sum("revenue").over(window_specification))
result_df=result_df.select("revenue","month","cummulativeSum")
result_df.show()
SQL Implementation
alter table RevenueData
add month as substring(date,charindex('-',date)+1,len(date)-charindex('-',date))
select revenue,month,sum(revenue) over (partition by month order by revenue) as cumm_sum
from RevenueData
Q6. Write a Pyspark query and SQL code to report the movies with an odd-numbered ID and a description that is not “boring”. Return the result table in descending order by rating.

OUTPUT DATAFRAME :

Pyspark Implementation
df.filter((df.ID%2!=0) & (df.description!="boring")).show()
SQL Implementation
select * from Movies where ID%2 <>0 and description<>'boring'
Q7. Imagine you have two PySpark DataFrames ‘orders’ and ‘order_items’, representing e-commerce order information and corresponding item details. Your task is to find the top-selling products, ranked by the total quantity sold, along with their corresponding categories.

order_items_df >>

RESULT DATAFRAME :
NOTE:
1. If total quantity is same for two products , product with product_id smaller should between them should come first as in case of 201 and 202
from pyspark.sql.functions import *
joined_df=orders_df.join(order_items_df,"order_id","inner")
joined_df.orderBy(desc("quantity"))
top_selling_products_df=joined_df.groupBy("product_id").agg(sum("quantity").alias("Total_quantity"))\
.orderBy(desc("Total_quantity"),"product_id")
top_selling_products_df.show()
select product_id,sum(quantity) as top_selling_products from orders as o inner join order_items as i on o.order_id=i.order_id group by product_id order by sum(quantity) desc,product_id

Q8. Imagine you have a messy source table containing customer data, and you need to clean and transform it using PySpark. Here’s a simplified version of the source table.

NOTES :
- Needs to replace all non-numeric characters in phone column with ” ” .
- filling missing values in phone column to Null.
- Renaming name column to Full_name
- Deriving a new age Column using registration_date and current_date().
from pyspark.sql.functions import *
from pyspark.sql.types import *
# changing dataType of registration_date column from string to DateType
df=df.withColumn("registration_date",to_date(df.registration_date,"yyyy-mm-dd"))
# replacing any other non numeric character with ''
df=df.withColumn("phone",regexp_replace(df.phone,"[^0-9]",""))
# filling missing values in phone column to Null
df=df.withColumn("phone",when(df.phone.isNull(),None).otherwise(df.phone))
hashtag#Renaming name column to Full_name
df=df.withColumnRenamed("name","Full_name")
# deriving age column using registration_date and current_date()
df=df.withColumn("age",(current_date()-df.registration_date).cast(IntegerType()))
df.show()