---
tags: BigData-MS-2020
title: PySpark. DataFrames
---
# Glimpse at Data Analytics
Today we continue to work with Spark. Spark is unique in the way that it provides tools for low-level programming, machine learning, SQL analytics, stream processing, graph analysis - all within one framework. Today, our focus will be the Spark SQL module. It is important to remember that Spark is a data processing framework and was not designed for data management. Spark’s data processing formats, DataFrames and GraphFrames, support basic query operations. In this tutorial, we will look at how to write SQL queries for DataFrames.
Plan for today:
- PySpark
- Loading data
- Queries
## PySpark
PySpark is useful for people who work with Python and use their skills in the area of Big Data and distributed processing.
### Installing with Docker
For rapid prototyping, it is many prefer to use Jupyter Notebooks. Luckily for us, Jupyter provides a [Docker image](https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#apache-spark) with everything configured to run PySpark kernels.
To start a Jupyter notebook session, you will need to create a folder where you will store your data and notebooks.
```
mkdir folder/with/ipynb/for/pyspark
```
Then, run the services
```
docker run -d -p 8888:8888 -v folder/with/ipynb/for/pyspark:/home/jovyan/ jupyter/pyspark-notebook
```
Open Jupyter notebook in the browser. In case you are connected to university network, you can set up Jupyter on your machine to work with university cluster. Configure the container using this `docker-compose.yml` file
```yaml
version: "2.0"
services:
jupyter_notebook:
image: "jupyter/pyspark-notebook:399cbb986c6b"
volumes:
- ./persistent_storage/:/home/jovyan/ # to save the notebook outside the container
- ../hadoop/etc/hadoop:/hadoop-conf # configuration to connect to the cluster
environment:
HADOOP_CONF_DIR: "/hadoop-conf"
PYSPARK_SUBMIT_ARGS: "--master yarn pyspark-shell"
HADOOP_USER_NAME: "your_user_name" # ask TA for a user name or use previous credentials
PYSPARK_PYTHON: "/home/hadoop/pyspark/miniconda3/bin/python" # keep this as is. this is python path for the cluster
TINI_SUBREAPER: "true"
network_mode: host # need this to make sure cluster can connect to the driver
pid: host
restart: unless-stopped
command: start.sh jupyter notebook --NotebookApp.token='gJ6hcMeQ' --port 8888 # this is notebook port
mem_limit: 1000MB
```
### Installing Locally
PySpark can be installed locally with pip (or conda)
```shell
pip install pyspark
```
Set `SPARK_HOME=path/to/spark` and run the Python interpreter or Jupyter notebook. Import PySpark into your Python project (this will start a Spark session that runs locally)
```python
import pyspark
```
Check section for docker contained to learn how to configure your notebook to work with university cluster.
### PySpark on University Cluster
Ask your TA for access to Jupyter Notebook on university cluster. On the cluster, PySpark runs on top of YARN and HDFS. All the data are meant to be read from and written to HDFS. Your TA will send you:
- Notebook address
- Access token
- User folder name in HDFS
:::warning
For those that are connected to university network, all options are viable. Those who have only VPN access, requesting notebook credentials is the only option.
:::
:::warning
Datasets are quite large. However, if the cluster is too occupied and you want to run everything locally, you can download datasets for this lab [here](http://millionsongdataset.com/tasteprofile/#getting) ([mirror](https://edisk.university.innopolis.ru/edisk/Public/Fall%202019/Big%20Data/Spark%20SQL/songs_dataset.zip)) (~3Gb unpacked) and [here](https://www.yelp.com/dataset) ([mirror](https://edisk.university.innopolis.ru/edisk/Public/Fall%202019/Big%20Data/Spark%20SQL/yelp-dataset.tar.gz)) (~6Gb unpacked).
:::
## Spark DataFrames
So far we have worked with RDDs. Spark also provides such data abstractions as [Datasets and DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes). All of these data abstractions are related to each other. Dataset is related to RDDs and DataFrame is a Dataset with type Row (`Dataset[Row]`). For examples of how to convert RDD to DataFrame, read [examples in the documentation](https://spark.apache.org/docs/latest/sql-getting-started.html#interoperating-with-rdds).
## Starting a Project
The project can be created as a `py` file or a Jupyter notebook. We recommend using Jupyter notebook provided on the cluster. The main reason for this is the size of datasets.
Start the project by importing `spark` and creating `SparkSession`
```python
from pyspark.sql import SparkSession
# since everyone will be using cluster at the same time
# let's make sure that everyone has resource. that is why
# the configuration uses dynamic resource allocation and
# maximum 1 executor
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.dynamicAllocation.enabled", "true")\
.config("spark.dynamicAllocation.shuffleTracking.enabled", "true")\
.config("spark.dynamicAllocation.maxExecutors", "1")\
.getOrCreate()
```
## Loading Data
Spark can load various formats, starting with simple textfiles and ending with databases such as Hive. More information about supported data types is available on the [documentation page](https://spark.apache.org/docs/latest/sql-data-sources.html). At the moment we are interested in three data formats:
- [CSV](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options)
- [Parquet](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html)
- [JSON](https://spark.apache.org/docs/latest/sql-data-sources-json.html)
The first dataset that we are going to work with is `songs_dataset` [accessible in HDFS](http://namenode:9870/explorer.html#/songs_dataset) at `/songs_dataset/train_triplets.txt`. The size of the dataset is 2.8 GBs. The [data](http://millionsongdataset.com/tasteprofile/#desc) is stored in a tab separated format (TSV) without a header. The columns are `(user, song, play count)`. TSV is a form of CSV. The data can be imported with
```python
songs_df = spark.read.load("/songs_dataset/train_triplets.txt",
format="csv", sep="\t", inferSchema="true",
header="false")
```
Data import can take some time. You can inspect the shema of the data with
```python
songs_df.printSchema()
```
Since the file came without headers, the column names are not descriptive. You can assign normal names by executing
```python
songs_df = songs_df.withColumnRenamed("_c0", "user")\
.withColumnRenamed("_c1", "song")\
.withColumnRenamed("_c2", "play_count")
```
The data was originally stored in an inefficient format. Let's store this data as Parquet. You can inspect the results with web-interface (http://namenode:9870/explorer.html#/user/your_user/songs.parquet). The Parquet format stores the data with a schema, with compression, and is much more efficient then plain CSV. You can see that the total dataset size in Parquet format is about ~700Mb.
<!-- More on Parquet:
1. https://medium.com/@bufan.zeng/use-parquet-for-big-data-storage-3b6292598653
2. https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
3. http://parquet.apache.org/documentation/latest/
4. https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html -->
## SQL Queries
Spark supports SQL queries. To execute queries, the DataFrame should be first registered
```python
songs_df.createOrReplaceTempView("songs")
```
Now you can execute SQL queries on a table with name `songs`.
```python
played_more_than_10_times = spark.sql("select song from songs where play_count > 10")
```
The result of SQL query is a DataFrame. You could have noticed that the response was instant. This is because the result was not computed yet. Try counting the number of entries in the new DataFrame to trigger the execution.
```python
played_more_than_10_times.count()
```
Write two more queries
- Query all the songs that have been listened by a given user
- Query all the songs that have been listened by a given user more than ten times
If you forgot how to write simple SQL queries, refer to [this guide](https://learnxinyminutes.com/docs/sql/).
Now let's move to real data processing. Kill the kernell to free the memory.
## Writing Spark SQL Queries for Yelp Dataset
Load the data and register tables
```python
business = spark.read.json("/yelp-dataset/yelp_academic_dataset_business.json")
reviews = spark.read.json("/yelp-dataset/yelp_academic_dataset_review.json")
users = spark.read.json("/yelp-dataset/yelp_academic_dataset_user.json")
business.createOrReplaceTempView("business")
reviews.createOrReplaceTempView("reviews")
users.createOrReplaceTempView("users")
```
Inspect the schema for these tables. Compare the schema with the [official schema description](https://www.yelp.com/dataset/documentation/main). Notice that Spark did not import all the fields correctly. Fields `users.friends` and `business.categories` have been imported as strings. We will need to account for that.
Spark SQL queries are similar to regular SQL queries. The full reference can be found in [documentation](https://spark.apache.org/docs/latest/sql-ref-syntax.html). Another useful resource is the list of built-in SQL functions that can be found [here](https://spark.apache.org/docs/latest/api/sql/) and [here](https://spark.apache.org/docs/latest/sql-ref.html).
One can define their own functions if that is necessary. For more information refer to [user-defined scalar functions](https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html).
Let us look at some more sophisticated query examples.
### Query 1
- Which state has more businesses registered on yelp? Group by state, count, sort by count in descending order
```python
spark.sql("select state, count(state) as count from business group by state order by count(state) desc").show()
```
The query describes itself. Here we [`group by`](https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-groupby.html) state and apply aggregation function [`count`](https://spark.apache.org/docs/latest/api/sql/#count). Declare the [`order by`](https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-orderby.html) criteria. The keyword `desc` indicates the descending order.
### Query 2
- How many are there unique restaurant categories?
```python
spark.sql("""
select count(distinct(*)) from (
select explode(split(categories, \",\s*\")) as category from business
)
""").show()
```
For this query, we should extract information about different categories from the field `categories`. You can notice that this field was imported as string, but it should be a list of strings. We are going to fix this using [`split`](https://spark.apache.org/docs/latest/api/sql/#split) function. After that, we are going to use database equivalent of flatMap [`explode`](https://spark.apache.org/docs/latest/api/sql/#explode) to transform this list into separate rows. The result of this operation can be passed on to the next query that will compute distinct categories and count them.
### Query 3
- Get distribution of restaurant categories in the city Phoenix, sort by count in descending order, show top 10
```python
spark.sql("""
select category, count(category) from
(
select explode(split(categories, \",\s*\")) as category
from business where city=\"Phoenix\"
)
group by category order by count(category) desc limit 10
""").show()
```
The process is similar to the previous query, except now we add the conditional statement [`where`](https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-where.html). After we retrieve the list of categories for the city of Phoenix, we need to group by category and count. We will [`limit`](https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-limit.html) the output to top 10 rows.
### Query 4
- How many people with more than 1000 friends?
```python
spark.sql("""
select
count(*) as friend_count
from
users
where
size(split(friends, \",\s*\")) > 1000
""").show()
```
The field `friends` in table `users` also was not imported correctly. We should split this field and determine the [`size`](https://spark.apache.org/docs/latest/api/sql/#size) of the result.
### Query 5
- Get restaurants that have their ratings decreased from 2014 to 2017
```python
spark.sql("""
with business_ratings as (
select
business_id, year(to_date(date)) as year, avg(stars) as rating
from
reviews group by business_id, year(to_date(date))
),
business_2014 as (
select
business_id, rating
from
business_ratings
where
year=2014
),
business_2017 as (
select
business_id, rating
from
business_ratings where year=2017
)
select
business_2014.business_id, business_2014.rating, business_2017.rating
from
business_2014
inner join
business_2017
on
business_2014.business_id=business_2017.business_id
where
business_2017.rating < business_2014.rating
""").show()
```
Using `with` statement we can create a temporary result. Information about business success can be extracted from `ratings`. The date column also did not import correctly. We can convert string representation of a date into the date format using [`to_date`](https://spark.apache.org/docs/latest/api/sql/#to_date). After, you can extract [`year`](https://spark.apache.org/docs/latest/api/sql/#year). First, we should compute business ratings by year. We store an intermediate query result into `business_ratings`. Now we need to filter only entries that have rows for 2014 and 2017. The results are stored in `business_2014` and `business_2014`. To get the final result, perform [`join`](https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-join.html) on the field `business_id` and filter those rows that satisfy the criteria.
### Query 6
- Get the list of people whose friends last time wrote a review for a chinese restaurant.
You will need to look at `reviews`, group by `user_id`. You are interested only in the last review that a person wrote. The `date` is stored as a string. Convert it to date format using `to_date`. Find the most recent date for a user using aggregation function `max`. Note that a person can write several reviews on the same day. Treat all of them as most recent. If the review was written for a Chinese restaurant, we should find who has this person in their friend list. The list of `friends` in stored in table `users`. The field `friends` is also stored as string, and you should split it first.
Implement this query yourself.
## Self-Check Questions
- What is a database index and how it helps increase the performance of query processing?
- Do Spark DataFrames have index similar to a database index?
- Do NoSQL databases have support for indexes?