Try   HackMD

Lab 06 - Apache Spark Sql

Course: Big Data - IU S23
Author: Firas Jolha

Dataset

Agenda

Prerequisites

  • PC can run a browser and internet connection

Objectives

  • Learn about the library Spark SQL
  • Practise on Spark MLlib
  • Learn how we can use SQL queries in Spark

Dataset Description

This dataset includes

44,341 results of international football matches starting from the very first official match in
1872
up to
2022
. The matches range from FIFA World Cup to FIFI Wild Cup to regular friendly matches. The matches are strictly men's full internationals and the data does not include Olympic Games or matches where at least one of the teams was the nation's B-team, U-23 or a league select team.

results.csv includes the following columns:

  • date - date of the match
  • home_team - the name of the home team
  • away_team - the name of the away team
  • home_score - full-time home team score including extra time, not including penalty-shootouts
  • away_score - full-time away team score including extra time, not including penalty-shootouts
  • tournament - the name of the tournament
  • city - the name of the city/town/administrative unit where the match was played
  • country - the name of the country where the match was played
  • neutral - TRUE/FALSE column indicating whether the match was played at a neutral venue

For the dataset of scorers and shootouts you can check this Kaggle data card.

Test

Go to the link to do the test.

Spark SQL

It is a module used for structured data processing. Spark SQL allows you to query structured data using either SQL or DataFrame API.

The pyspark.sql is a module in Spark that is used to perform SQL-like operations on the data stored in memory. You can either leverage using programming API to query the data or use the ANSI SQL queries similar to RDBMS. You can also mix both, for example, use API on the result of an SQL query.

Following are the important classes from the SQL module.

  • pyspark.sql.SparkSession – SparkSession is the main entry point for DataFrame and SQL functionality.
  • pyspark.sql.DataFrame – DataFrame is a distributed collection of data organized into named columns.
  • pyspark.sql.Column – A column expression in a DataFrame.
  • pyspark.sql.Row – A row of data in a DataFrame.
  • pyspark.sql.GroupedData – An object type that is returned by DataFrame.groupBy().
  • pyspark.sql.DataFrameNaFunctions – Methods for handling missing data (null values).
  • pyspark.sql.DataFrameStatFunctions – Methods for statistics functionality.
  • pyspark.sql.functions – List of standard built-in functions.
  • pyspark.sql.types – Available SQL data types in Spark.
  • pyspark.sql.Window – Would be used to work with window functions.
    Regardless of what approach you use, you have to create a SparkSession which is an entry point to the Spark application.

Spark SQL is one of the most used Spark modules for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax. In other words, Spark SQL brings native RAW SQL queries on Spark meaning you can run traditional ANSI SQL on Spark Dataframe.

In order to use SQL, first, register a temporary table/view on DataFrame using the createOrReplaceTempView() function. Once created, this table can be accessed throughout the SparkSession using sql() and it will be dropped along with your SparkContext termination. Use sql() method of the SparkSession object to run the query and this method returns a new DataFrame.

Spark SQL Examples

  1. Create SQL View

    • Load the data and read the results dataframe.
      ​​​​​​​​from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType
      
      ​​​​​​​​schema = StructType([
      ​​​​​​​​    StructField("date", DateType(), False),
      ​​​​​​​​    StructField("home_team", StringType(), False),
      ​​​​​​​​    StructField("away_team", StringType(), False),
      ​​​​​​​​    StructField("home_score", IntegerType(), False),
      ​​​​​​​​    StructField("away_score", IntegerType(), False),
      ​​​​​​​​    StructField("tournament", StringType(), False),
      ​​​​​​​​    StructField("city", StringType(), False),
      ​​​​​​​​    StructField("country", StringType(), False),
      ​​​​​​​​    StructField("neutral", BooleanType(), False),
      ​​​​​​​​])
      
      ​​​​​​​​# You can also use spark.read.csv function
      ​​​​​​​​df = spark.read.format("csv").load("results.csv", header = True, schema = schema)
      ​​​​​​​​df
      
    • Creat the temporary view.
      ​​​​​​​​df.createOrReplaceTempView("results_table")
      
  2. Spark SQL to Select Columns

    ​​​​// DataFrame API Select query
    ​​​​df.select("home_team","city","country","tournament") 
    ​​​​     .show(5)
    
    ​​​​// SQL Select query
    ​​​​spark.sql("SELECT home_team, city, country, tournament FROM RESULTS_TABLE") 
    ​​​​     .show(5)
    
  3. Filter Rows
    To filter the rows from the data, you can use

    where() function from the DataFrame API.

    ​​​​// DataFrame API where()
    ​​​​df.select("country","city","home_team","tournament") 
    ​​​​  .where("city == 'Moscow'") 
    ​​​​  .show(5)
    

    Similarly, in SQL you can use WHERE clause as follows.

    ​​​​// SQL where
    ​​​​spark.sql(""" SELECT  country, city, home_team, tournament FROM RESULTS_TABLE 
    ​​​​          WHERE city = 'Moscow' """) 
    ​​​​     .show(5)
    
    
  4. Sorting

    ​​​​// sorting
    ​​​​df.select("country","city","home_team","tournament") 
    ​​​​  .where("city in ('London','Paris','Moscow')") 
    ​​​​  .orderBy("city")
    ​​​​  .show(10)
    
    
    ​​​​// SQL ORDER BY
    ​​​​spark.sql(""" SELECT  country, city, home_team, tournament FROM RESULTS_TABLE 
    ​​​​          WHERE city in ('London','Paris','Moscow') order by city """) 
    ​​​​     .show(10)
    
    
  5. Grouping

// grouping
df.groupBy("city").count() 
  .show()
    
// SQL GROUP BY clause
spark.sql(""" SELECT city, count(*) as count FROM RESULTS_TABLE 
          GROUP BY city""") 
     .show()
  1. SQL Join Operations

PySpark SQL join has a below syntax and it can be accessed directly from DataFrame.

join(self, other, on=None, how=None)

join() operation takes parameters as below and returns DataFrame.

  • param other: Right side of the join
  • param on: a string for the join column name
  • param how: default inner. Must be one of inner, cross, outer,full, full_outer, left, left_outer, right, right_outer,left_semi, and left_anti.

You can also write Join expression by adding

where() and
filter()
methods on DataFrame and can have Join on multiple columns.

  • Create two Spark dataframes
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)
  • Create two tables
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
  • Inner join

# Join in pyspark.sql.DataFrame API

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)


# SQL INNER JOIN
joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

# SQL INNER JOIN
joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)
  • Left join
    Left a.k.a Leftouter join returns all rows from the left dataset regardless of match found on the right dataset when join expression doesn’t match, it assigns null for that record and drops records from right where match not found.

# Left Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id, "left") \ 
    .show(truncate=False)
    
    
# SQL LEFT JOIN
joinDF = spark.sql("select * from EMP e LEFT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)
  • Right Join

# Right Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id, "right") \ 
    .show(truncate=False)
    
    
# SQL RIGHT JOIN
joinDF = spark.sql("select * from EMP e RIGHT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)
  • Full join

# Full Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id, "full") \ 
    .show(truncate=False)
    
    
# SQL FULL JOIN
joinDF = spark.sql("select * from EMP e FULL OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

You can read about Anti-joins, semi-joins and unions from here.

References