Welcome to the Spark Challenge. You are provided with the Black Friday sales data and we as a big data developer needs to analyse and fetch the required data.
• We have provided the template in challenge.py. In the challenge files the input path and output path is already set for each function. output path : ~project/challenge/output/.
• Output columns name should be same as given in the sample output.
• Do not modify the output files manually, then the modified output file won’t be taken for validation.
• Your output file should be stored as a CSV file.
Spark Shell : - This is for only those who choose the spark shell instead of the given template. Ignore this part if you are not choosing spark shell to solve this challenge
• You can also solve this challenge with a spark shell. But we recommend you to use the template that we have given in the challenge.py
For pyspark users:- Open a terminal (Right click -> New Terminal) and type pyspark in the terminal and press Enter. After opening the pyspark shell, you can import the functions and perform the operations using pyspark.
Note:
1. In the template we have defined functions and paths for the input, output files using parameters. You can ignore the functions and parameters. Focus on the given operations.
Output directory for the output files: -
1. result_1 : project/challenge/output/result_1
2. result_2 : project/challenge/output/result_2
3. result_3 : project/challenge/output/result_3
• The input file contains the details about the black friday sales details.
• You are given a Black_Friday_Sales.csv project/challenge/inputfile/ .
Complete the following operations inside the load_data function :
Note : Output files should be a single partition CSV file with header.
Task 0:-
# load_data function is important and necessary for all the tasks.
def read_data (spark,input_file):
'''
spark_session : spark
for input_file : input_file
'''
#Replace this line with your actual code
df = spark.read.csv(input_file, header=True, inferSchema=True)
return df
Task 1 : -
# Complete the following operations inside the result_1 function:
#The following are the parameters : -
# ◦ input_file : input_df
1. Use the input_df to complete the task.
2. Find the highest purchase amount for each of the age groups given.
3. The name of the column of highest purchase should be as "Maximum_Purchase"
4. Columns to be fetched : Age, Maximum_Purchase
5. Return the final dataframe.
def result_1(input_df):
'''
for input file: input_df
'''
print("-------------------")
print("Starting result_1")
print("-------------------")
#------------------------------
# Complete the following operations inside the result_1 function:
#The following are the parameters : -
# ◦ input_file : input_df
# 1. Use the input_df to complete the task |
# 2. Find the highest purchase amount for each of the age groups given.
# 3. The name of the column of highest purchase should be as "Maximum_Purchase"
# 4. Columns to be fetched : Age, Maximum_Purchase |
# 5. In the challenge file, the return statement is already defined.
# You need to replace the df with your final output data frame name.
#----------------------------------
df = input_df.groupBy("Age").agg(max("Purchase").alias("Maximum_Purchase"))
return df #return the final dataframe
# The following are the parameters : -
# ◦ input_file : input_df
1. Use input_df to complete the task
2. Find the total sum of the purchase made by each City Category
3. The column name for the sum of purchase amount should be as "Total_sum"
4. Fetch the columns :
# City_Category, Total_sum
5. Return the final dataframe.
def result_2(input_df):
'''
for input file: input_df
'''
print("-------------------------")
print("Starting result_2")
print("-------------------------")
#-------------------------------------------------------------------------------------------
# The following are the parameters : -
# ◦ input_file : input_df
# 1. Use input_df to complete the task
# 2. Find the total sum of the purchase made by each City Category
# 3. The column name for the sum of purchase amount should be as "Total_sum"
# 4. Fetch the columns :
# City_Category, Total_sum
# 5. In challenge file, the return statement is already defined.
# You need to replace the df with your final output data frame name.
#-------------------------------------------------------------------------------------------
df = input_df.groupBy("City_Category").agg(sum("Purchase").alias("Total_sum"))
#Write your code
return df #return the final dataframe
# The following are the parameters : -
#◦ input_file : input_df
1. Use the input_df to complete the task.
2. Fetch the records where the Purchase is greater than 1000 and Marital Status should be "Single".
3. Columns to be fetched :User_ID, Product_ID, Marital_Status, Purchase.
4. Return the final dataframe.
def result_3(input_df):
'''
for input file: input_df
'''
print("-------------------------")
print("Starting result_3")
print("-------------------------")
# #--------------------------------------------------------------------------------------------
# # The following are the parameters : -
# # ◦ input_file : input_df
# 1. Use the input_df to complete the task
# 2. Fetch the records where the Purchase is greater than 1000 and Marital Status
# should be "Single"
# 3. Columns to be fetched :
# User_ID, Product_ID, Marital_Status, Purchase
# 4. In the challenge file, the return statement is already defined.
# You need to replace the df with your final output data frame name.
# #--------------------------------------------------------------------------------------------
# Filter records where Purchase > 1000 and Marital_Status is "Single"
filtered_df = input_df.filter((col("Purchase") > 1000) & (col("Marital_Status") == "Single"))
# Select the specified columns
selected_columns_df = filtered_df.select("User_ID", "Product_ID", "Marital_Status", "Purchase")
# Remove duplicate records
final_df = selected_columns_df.dropDuplicates()
return final_df #return the final dataframe
def load_data(data,outputpath):
#------------------------------------------------------------------------|
# 1. Write a code to store the outputs to the respective locations. |
# Note: |
# • Output files should be a single partition CSV file with header.|
# • load_data function is important for all the tasks. |
#-------------------------------------------------------------------------
if (data.count() != 0):
print("Loading the data",outputpath)
# Save the DataFrame as a CSV file with a single partition and header
data.coalesce(1).write.csv(outputpath, header=True)
#Write your code above this line
else:
print("Empty dataframe, hence cannot save the data",outputpath)
NOTE :
Output Path : - ~project/challenge/output/
Input Path : - ~project/challenge//inputfile/
How to run a sample test case :
• Open a terminal and navigate to ~/project/challenge. Run the command in the terminal : - spark-submit sampletest.py
Note :
• The sample test case does not represent the main test. The actual test case will run only after clicking on the SUBMIT button.
• Click on SUBMIT to validate your solution.