--- tags: BigData title: Lab 06 - Apache Spark Sql --- # Lab 06 - Apache Spark Sql **Course:** Big Data - IU S23 **Author:** Firas Jolha # Dataset - [FIFA World Cup Match Results from 1872 till 2022](https://raw.githubusercontent.com/aminebennaji19/FIFA-World-Cup-Qatar-2022/main/data/results.csv) <!-- # Spark on Colab - [Tutorial notebook](https://colab.research.google.com/drive/1RYCW98b8nWhj0Go62wA5MyfWkyYbZiSt?usp=sharing) --> # Agenda [toc] <!-- - PC (6GB RAM, ~10GB Disk) --> # Prerequisites - PC can run a browser and internet connection # Objectives - Learn about the library Spark SQL - Practise on Spark MLlib - Learn how we can use SQL queries in Spark # Dataset Description This dataset includes $44,341$ results of international football matches starting from the very first official match in $1872$ up to $2022$. The matches range from FIFA World Cup to FIFI Wild Cup to regular friendly matches. The matches are strictly men's full internationals and the data does not include Olympic Games or matches where at least one of the teams was the nation's B-team, U-23 or a league select team. **results.csv** includes the following columns: - **date** - date of the match - **home_team** - the name of the home team - **away_team** - the name of the away team - **home_score** - full-time home team score including extra time, not including penalty-shootouts - **away_score** - full-time away team score including extra time, not including penalty-shootouts - **tournament** - the name of the tournament - **city** - the name of the city/town/administrative unit where the match was played - **country** - the name of the country where the match was played - **neutral** - TRUE/FALSE column indicating whether the match was played at a neutral venue For the dataset of scorers and shootouts you can check this [Kaggle data card](https://www.kaggle.com/datasets/martj42/international-football-results-from-1872-to-2017). # Test Go to the [link](https://colab.research.google.com/drive/1Nu0lk7TDJH6mrfMv4hp-AlnWUOMR1Ddz?usp=sharing) to do the test. # Spark SQL It is a module used for structured data processing. Spark SQL allows you to query structured data using either SQL or DataFrame API. The `pyspark.sql` is a module in Spark that is used to perform SQL-like operations on the data stored in memory. You can either leverage using programming API to query the data or use the ANSI SQL queries similar to RDBMS. You can also mix both, for example, use API on the result of an SQL query. Following are the important classes from the SQL module. - `pyspark.sql.SparkSession` – SparkSession is the main entry point for DataFrame and SQL functionality. - `pyspark.sql.DataFrame` – DataFrame is a distributed collection of data organized into named columns. - `pyspark.sql.Column` – A column expression in a DataFrame. - `pyspark.sql.Row` – A row of data in a DataFrame. - `pyspark.sql.GroupedData` – An object type that is returned by DataFrame.groupBy(). - `pyspark.sql.DataFrameNaFunctions` – Methods for handling missing data (null values). - `pyspark.sql.DataFrameStatFunctions` – Methods for statistics functionality. - `pyspark.sql.functions` – List of standard built-in functions. - `pyspark.sql.types` – Available SQL data types in Spark. - `pyspark.sql.Window` – Would be used to work with window functions. Regardless of what approach you use, you have to create a SparkSession which is an entry point to the Spark application. **Spark SQL** is one of the most used Spark modules for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax. In other words, Spark SQL brings native RAW SQL queries on Spark meaning you can run traditional ANSI SQL on Spark Dataframe. In order to use SQL, first, register a **temporary table/view on DataFrame** using the `createOrReplaceTempView()` function. Once created, this table can be accessed throughout the SparkSession using `sql()` and it will be dropped along with your SparkContext termination. Use `sql()` method of the SparkSession object to run the query and this method returns a new DataFrame. ## Spark SQL Examples 1. Create SQL View - Load the data and read the `results` dataframe. ```python from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType schema = StructType([ StructField("date", DateType(), False), StructField("home_team", StringType(), False), StructField("away_team", StringType(), False), StructField("home_score", IntegerType(), False), StructField("away_score", IntegerType(), False), StructField("tournament", StringType(), False), StructField("city", StringType(), False), StructField("country", StringType(), False), StructField("neutral", BooleanType(), False), ]) # You can also use spark.read.csv function df = spark.read.format("csv").load("results.csv", header = True, schema = schema) df ``` - Creat the temporary view. ```python! df.createOrReplaceTempView("results_table") ``` 3. Spark SQL to Select Columns ```python! // DataFrame API Select query df.select("home_team","city","country","tournament") .show(5) // SQL Select query spark.sql("SELECT home_team, city, country, tournament FROM RESULTS_TABLE") .show(5) ``` 5. Filter Rows To filter the rows from the data, you can use $where()$ function from the DataFrame API. ```python! // DataFrame API where() df.select("country","city","home_team","tournament") .where("city == 'Moscow'") .show(5) ``` Similarly, in SQL you can use WHERE clause as follows. ```python! // SQL where spark.sql(""" SELECT country, city, home_team, tournament FROM RESULTS_TABLE WHERE city = 'Moscow' """) .show(5) ``` 6. Sorting ```python! // sorting df.select("country","city","home_team","tournament") .where("city in ('London','Paris','Moscow')") .orderBy("city") .show(10) // SQL ORDER BY spark.sql(""" SELECT country, city, home_team, tournament FROM RESULTS_TABLE WHERE city in ('London','Paris','Moscow') order by city """) .show(10) ``` 7. Grouping ```python! // grouping df.groupBy("city").count() .show() // SQL GROUP BY clause spark.sql(""" SELECT city, count(*) as count FROM RESULTS_TABLE GROUP BY city""") .show() ``` 8. SQL Join Operations PySpark SQL join has a below syntax and it can be accessed directly from DataFrame. ```scala! join(self, other, on=None, how=None) ``` join() operation takes parameters as below and returns DataFrame. - param *other*: Right side of the join - param *on*: a string for the join column name - param *how*: default inner. Must be one of inner, cross, outer,full, full_outer, left, left_outer, right, right_outer,left_semi, and left_anti. You can also write Join expression by adding $where()$ and $filter()$ methods on DataFrame and can have Join on multiple columns. - Create two Spark dataframes ```python! emp = [(1,"Smith",-1,"2018","10","M",3000), \ (2,"Rose",1,"2010","20","M",4000), \ (3,"Williams",1,"2010","10","M",1000), \ (4,"Jones",2,"2005","10","F",2000), \ (5,"Brown",2,"2010","40","",-1), \ (6,"Brown",2,"2010","50","",-1) \ ] empColumns = ["emp_id","name","superior_emp_id","year_joined", \ "emp_dept_id","gender","salary"] empDF = spark.createDataFrame(data=emp, schema = empColumns) empDF.printSchema() empDF.show(truncate=False) dept = [("Finance",10), \ ("Marketing",20), \ ("Sales",30), \ ("IT",40) \ ] deptColumns = ["dept_name","dept_id"] deptDF = spark.createDataFrame(data=dept, schema = deptColumns) deptDF.printSchema() deptDF.show(truncate=False) ``` - Create two tables ```python! empDF.createOrReplaceTempView("EMP") deptDF.createOrReplaceTempView("DEPT") ``` - Inner join ```python! # Join in pyspark.sql.DataFrame API empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"inner") \ .show(truncate=False) # SQL INNER JOIN joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \ .show(truncate=False) # SQL INNER JOIN joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \ .show(truncate=False) ``` - Left join **Left** a.k.a **Leftouter join** returns all rows from the left dataset regardless of match found on the right dataset when join expression doesn’t match, it assigns null for that record and drops records from right where match not found. ```python! # Left Join in pyspark.sql.DataFrame API empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "left") \ .show(truncate=False) # SQL LEFT JOIN joinDF = spark.sql("select * from EMP e LEFT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \ .show(truncate=False) ``` - Right Join ```python! # Right Join in pyspark.sql.DataFrame API empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "right") \ .show(truncate=False) # SQL RIGHT JOIN joinDF = spark.sql("select * from EMP e RIGHT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \ .show(truncate=False) ``` - Full join ```python! # Full Join in pyspark.sql.DataFrame API empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "full") \ .show(truncate=False) # SQL FULL JOIN joinDF = spark.sql("select * from EMP e FULL OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \ .show(truncate=False) ``` You can read about Anti-joins, semi-joins and unions from [here](https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-join.html). <!-- 9. Union --> # References - [Spark By Examples](https://sparkbyexamples.com/) - [Spark SQL guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) - [PySpark documentation](https://spark.apache.org/docs/latest/api/python/reference/index.html) <!-- - Ryngksai, Iateilang and L. Chameikho. “Recommender Systems: Types of Filtering Techniques.” International journal of engineering research and technology 3 (2014): n. pag. --> <!-- - Implement the following performance evaluation metrics: - a function `precision` for *Precision@k* metric. - a function `recall` for *Recall@k* metric. - a function `hitrate` for *HitRate@k* metric. - Evaluate the performance of the recommendation system for all similarity measures you implemented. Use the performance metrics you implemented. - Add your findings to the report. --> <!-- - Define a function `recommend_cb_user` which accepts a user name and returns the recommended items ranked by the similarity. --> <!-- - For the user `Amazon Customer`, return the top-10 recommended products. - For each similarity function. - Apply the similarity measure on ICM in a pairwise fashion and compute the similarity of each item with respect to the others. -->