# PySpark

--
### modules
- `import os`
- `os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/anaconda3/bin/python3.7'`
- `os.environ['PYSPARK_PYTHON'] = '/opt/anaconda3/bin/python3.7'`
- `os.environ["SPARK_HOME"] = '/opt/cloudera/parcels/CDH/lib/spark'`
- `import findspark`
- `findspark.init()`
- `import pyspark`
- `from pyspark import SparkContext,SparkConf`
- `from pyspark.sql import SparkSession,SQLContext, Row`
### commands
- `sc = pyspark.SparkContext(appName="WordCount")`
- `lines = sc.textFile("war_and_peace.txt",2)
type(lines)`
- #### Count
return the no. of elements in RDD: `lines.count()`
- #### Take
takes the first 5 elements of RDD: `lines.take(5)`
- #### getNumPartitions
returns the no. of partitions in RDD: `lines.getNumPartitions()`
- #### Filter null lines. Display only non null lines
- `nonull = lines.filter(lambda x: x!="")`
`nonull.count()`
- `nonull = lines.filter(lambda x: len(x)>0)`
`nonull.count()`
- #### flatMap
seperate the words and count the no. of words
- `words = nonull.flatMap(lambda line: line.split())`
`words.count()`
`words.take(20)`
- #### Map
- converting each word into uppercase
`uwords = words.map(lambda x: x.upper())`
`uwords.take(20)`
- `powords = no.null.map(lambda x:(x,1))`
`powords.take(5)`
- #### ReduceByKey
`wordfreq = powords.reduceByKey(lambda prev,new:prev+new)`
`wordfreq.take(5)`
- #### Persist
Set this RDD's storage level to persist its values across operations after the first time it is computed.
`lines.persist()`
- #### Accumulator
is used to define how to add values of the data type if provided
`blankLines = sc.accumulator(0)`
- `def extractTextLines(line):`
` global blankLines`
` if (line == ""):`
` blankLines +=1`
`return line.split()`
- `textLines = lines.flatMap(extractTextLines)`
- `textLines.saveAsTextFile("textLinesoutput")`
- `print("Blank lines %d" % (blankLines.value))`
### DataFrame Commands
`spark = SparkSession.builder.appName('Dataframe').getOrCreate()`
- Ways to read a file:
- `df = spark.read.csv("test1.csv")`
- `df = spark.read.option('header','true').csv("test1.csv",inferSchema=True)`
- `df = spark.read.csv("test1.csv",inferSchema=True,header=True)`
- print schema: `df.printSchema()`
- display first row: `df.head()`
- `df.describe().show()`
- `df.select(["Name"]).show()`
- `df.select(["Name","salary"]).show()`
- Adding a new column:
`df = df.withColumn("Experience after 2 years",df["Experience"]+2)`
`df.show()`
- Adding another column:
`df = df.withColumn("Total Salary",df["Experience"]*df["Salary"])`
`df.show()`
- Dropping a column:
`df = df.drop("Experience after 2 years")`
`df.show()`
- Rename a column:
`df.withColumnRenamed("Name","Employee Name").show()`
- Drop null values: dropna()
`df.dropna().show()`
- how(any or all):
`df.dropna(how="any").show()`
`df.dropna(how="all").show()`
- thresh:
`df.dropna(how="any",thresh=1).show()`
- subset:
`df.dropna(how="any",subset=['Age']).show()`
- Fill null values: fillna()
- `df.fillna("Null Values").show()`:will only fill null values of first column
- `df.fillna(22,subset=["age"]).show()`:will fill null values of column 'Age'
- #### Filter Operations:
- &,|,==
- ~
-`df.filter("salary<=20000").show()`
- `df.filter("salary<=20000").select(["Name","Salary"]).show()`
- `df.filter("salary>20000 or salary<15000").show()`
- `df.filter("not salary<20000").show()`
- #### GroupBy command:
- `df.groupBy("Departments").count().show()`
- `df.groupBy("Name").avg("salary").show()`
- agg: `df.agg({'Salary':'avg'}).show()`
- #### Windowspec:
- `from pyspark.sql.window import Window`
`from pyspark.sql.functions import row_number`
`windowSpec = Window.partitionBy("departments").orderBy("salary")`
`df.withColumn("row_number",row_number().over(windowSpec)).show()`
##### - Rank: `from pyspark.sql.functions import rank`
`df.withColumn("rank",rank().over(windowSpec)).show()`
##### - Dense Rank: `from pyspark.sql.functions import dense_rank`
`df.withColumn("dense_rank",dense_rank().over(windowSpec)).show()`
- #### Create temp view/Table:
`df.createOrReplaceTempView("Employee")`
- ##### SQL Query
`spark.sql("select EmpName, dept_name,dept_id from Employee").show()`
- #### Read .json file:
`zc = spark.read.json("zipcodes.json")`
- #### read Parquet file:
`data =[("James ","","Smith","36636","M",3000),
("Michael ","Rose","","40288","M",4000),
("Robert ","","Williams","42114","M",4000),
("Maria ","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)]
columns=["firstname","middlename","lastname","dob","gender","salary"]
df=spark.createDataFrame(data,columns)`
- create parquet file:
`df.write.parquet("Users/chandrik.sharma/Hadoop+Hive/people.parquet")`
- read parquet file:
`parDF1 = spark.read.parquet("Users/chandrik.sharma/Hadoop+Hive/people.parquet")`
Then,
`parDF1.createOrReplaceTempView("People")`
`spark.sql("select * from People").show()`
### use hive with Pyspark
- `spark = SparkSession.builder.appName('Python Spark SQL').config("spark.sql.warehouse.dir", "hdfs://localhost:8000/user/hive/warehouse").enableHiveSupport().getOrCreate()`
- `spark.sql("show databases").show(3000)`
- `spark.sql("use hm").show()`
- `spark.sql("select * from emp").show()`
---
---
---
# Wednesday (23-11-2022)
# Handling Null Values- two ways
- ## FillNa()

- ## Imputer


- Another Example:


## One Hot Encoding



- ### using Pipeline


### Vector Assembler


### Standard Scaler

### Linear Regression

#### Evaluation
