# PySpark ![](https://i.imgur.com/AYPpPkP.png) -- ### modules - `import os` - `os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/anaconda3/bin/python3.7'` - `os.environ['PYSPARK_PYTHON'] = '/opt/anaconda3/bin/python3.7'` - `os.environ["SPARK_HOME"] = '/opt/cloudera/parcels/CDH/lib/spark'` - `import findspark` - `findspark.init()` - `import pyspark` - `from pyspark import SparkContext,SparkConf` - `from pyspark.sql import SparkSession,SQLContext, Row` ### commands - `sc = pyspark.SparkContext(appName="WordCount")` - `lines = sc.textFile("war_and_peace.txt",2) type(lines)` - #### Count return the no. of elements in RDD: `lines.count()` - #### Take takes the first 5 elements of RDD: `lines.take(5)` - #### getNumPartitions returns the no. of partitions in RDD: `lines.getNumPartitions()` - #### Filter null lines. Display only non null lines - `nonull = lines.filter(lambda x: x!="")` `nonull.count()` - `nonull = lines.filter(lambda x: len(x)>0)` `nonull.count()` - #### flatMap seperate the words and count the no. of words - `words = nonull.flatMap(lambda line: line.split())` `words.count()` `words.take(20)` - #### Map - converting each word into uppercase `uwords = words.map(lambda x: x.upper())` `uwords.take(20)` - `powords = no.null.map(lambda x:(x,1))` `powords.take(5)` - #### ReduceByKey `wordfreq = powords.reduceByKey(lambda prev,new:prev+new)` `wordfreq.take(5)` - #### Persist Set this RDD's storage level to persist its values across operations after the first time it is computed. `lines.persist()` - #### Accumulator is used to define how to add values of the data type if provided `blankLines = sc.accumulator(0)` - `def extractTextLines(line):` ` global blankLines` ` if (line == ""):` ` blankLines +=1` `return line.split()` - `textLines = lines.flatMap(extractTextLines)` - `textLines.saveAsTextFile("textLinesoutput")` - `print("Blank lines %d" % (blankLines.value))` ### DataFrame Commands `spark = SparkSession.builder.appName('Dataframe').getOrCreate()` - Ways to read a file: - `df = spark.read.csv("test1.csv")` - `df = spark.read.option('header','true').csv("test1.csv",inferSchema=True)` - `df = spark.read.csv("test1.csv",inferSchema=True,header=True)` - print schema: `df.printSchema()` - display first row: `df.head()` - `df.describe().show()` - `df.select(["Name"]).show()` - `df.select(["Name","salary"]).show()` - Adding a new column: `df = df.withColumn("Experience after 2 years",df["Experience"]+2)` `df.show()` - Adding another column: `df = df.withColumn("Total Salary",df["Experience"]*df["Salary"])` `df.show()` - Dropping a column: `df = df.drop("Experience after 2 years")` `df.show()` - Rename a column: `df.withColumnRenamed("Name","Employee Name").show()` - Drop null values: dropna() `df.dropna().show()` - how(any or all): `df.dropna(how="any").show()` `df.dropna(how="all").show()` - thresh: `df.dropna(how="any",thresh=1).show()` - subset: `df.dropna(how="any",subset=['Age']).show()` - Fill null values: fillna() - `df.fillna("Null Values").show()`:will only fill null values of first column - `df.fillna(22,subset=["age"]).show()`:will fill null values of column 'Age' - #### Filter Operations: - &,|,== - ~ -`df.filter("salary<=20000").show()` - `df.filter("salary<=20000").select(["Name","Salary"]).show()` - `df.filter("salary>20000 or salary<15000").show()` - `df.filter("not salary<20000").show()` - #### GroupBy command: - `df.groupBy("Departments").count().show()` - `df.groupBy("Name").avg("salary").show()` - agg: `df.agg({'Salary':'avg'}).show()` - #### Windowspec: - `from pyspark.sql.window import Window` `from pyspark.sql.functions import row_number` `windowSpec = Window.partitionBy("departments").orderBy("salary")` `df.withColumn("row_number",row_number().over(windowSpec)).show()` ##### - Rank: `from pyspark.sql.functions import rank` `df.withColumn("rank",rank().over(windowSpec)).show()` ##### - Dense Rank: `from pyspark.sql.functions import dense_rank` `df.withColumn("dense_rank",dense_rank().over(windowSpec)).show()` - #### Create temp view/Table: `df.createOrReplaceTempView("Employee")` - ##### SQL Query `spark.sql("select EmpName, dept_name,dept_id from Employee").show()` - #### Read .json file: `zc = spark.read.json("zipcodes.json")` - #### read Parquet file: `data =[("James ","","Smith","36636","M",3000), ("Michael ","Rose","","40288","M",4000), ("Robert ","","Williams","42114","M",4000), ("Maria ","Anne","Jones","39192","F",4000), ("Jen","Mary","Brown","","F",-1)] columns=["firstname","middlename","lastname","dob","gender","salary"] df=spark.createDataFrame(data,columns)` - create parquet file: `df.write.parquet("Users/chandrik.sharma/Hadoop+Hive/people.parquet")` - read parquet file: `parDF1 = spark.read.parquet("Users/chandrik.sharma/Hadoop+Hive/people.parquet")` Then, `parDF1.createOrReplaceTempView("People")` `spark.sql("select * from People").show()` ### use hive with Pyspark - `spark = SparkSession.builder.appName('Python Spark SQL').config("spark.sql.warehouse.dir", "hdfs://localhost:8000/user/hive/warehouse").enableHiveSupport().getOrCreate()` - `spark.sql("show databases").show(3000)` - `spark.sql("use hm").show()` - `spark.sql("select * from emp").show()` --- --- --- # Wednesday (23-11-2022) # Handling Null Values- two ways - ## FillNa() ![](https://i.imgur.com/FskgtGJ.png) - ## Imputer ![](https://i.imgur.com/R0vepWS.png) ![](https://i.imgur.com/Qk4EHqH.png) - Another Example: ![](https://i.imgur.com/vzkgz5r.png) ![](https://i.imgur.com/zwtTrP2.png) ## One Hot Encoding ![](https://i.imgur.com/UDMedJ7.png) ![](https://i.imgur.com/YIuP8Ot.png) ![](https://i.imgur.com/IES5gWT.png) - ### using Pipeline ![](https://i.imgur.com/bXy9xHq.png) ![](https://i.imgur.com/OBHwh9i.png) ### Vector Assembler ![](https://i.imgur.com/cy42ru5.png) ![](https://i.imgur.com/Dw33uTs.png) ### Standard Scaler ![](https://i.imgur.com/EMzJZ2A.png) ### Linear Regression ![](https://i.imgur.com/YjfOlQE.png) #### Evaluation ![](https://i.imgur.com/rwqyihU.png)