--- title: Big Data. 006. RDDs i Hadoop Spark SQL utilitzant Python. tags: DAM, Big Data --- [Link en MarkDown](https://hackmd.io/@JdaXaviQ/HkfrRD2Es) # DAM. Big Data 006. RDDs i Hadoop Spark SQL utilitzant Python ![](http://spark.apache.org/images/spark-logo.png) ## L'API Python d'Spark Tot i que Spark està escrit en Scala que compila a JVM podem utilitzar-lo emprant Python a travès de biblioteques especialitzades. Necessitarem instal·lar al nostre sistema els paquets __pyspark__ i __findspark__. ```python #Instal·lació al nostre sistema operatiu dels paquets Python necessris. !pip3 install pyspark !pip3 install findspark ``` Defaulting to user installation because normal site-packages is not writeable Requirement already satisfied: pyspark in /home/xavi/.local/lib/python3.10/site-packages (3.3.1) Requirement already satisfied: py4j==0.10.9.5 in /home/xavi/.local/lib/python3.10/site-packages (from pyspark) (0.10.9.5) Defaulting to user installation because normal site-packages is not writeable Requirement already satisfied: findspark in /home/xavi/.local/lib/python3.10/site-packages (2.0.1) Ara ja podem escriure el nostre codi Python ```python import findspark from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession # Inicialitzem Spark findspark.init() # Generem el context sc = SparkContext() # Creating a spark session spark = SparkSession \ .builder \ .appName("DAM-M15. Python Spark DataFrames: ús bàsic") \ .config("spark.opció.configuració", "valor de configuració") \ .getOrCreate() ``` 22/10/30 22:34:31 WARN Utils: Your hostname, xavi-portatil resolves to a loopback address: 127.0.1.1; using 172.18.42.211 instead (on interface wlo1) 22/10/30 22:34:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/10/30 22:34:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable ```python print(spark) ``` <pyspark.sql.session.SparkSession object at 0x7f26769511b0> ## Resilient Distributed Datasets o RDDs RDDS (Resilient Distributed Datasets) són una abstracció de dades que Spark és capaç de generar a partir d'informació desada en sistemes de fitxers distribuïts tipus HDFS. Aquests tipus de dades tenen estructura tabular i permeten consultes tipus SQL. ### Creació del nostre primer RDDs Amb les següents sentències, podem generar un RDDs que conté els 30 primers nombres naturals: ```python data = range(1,31) # print first element of iterator print(data[0]) print(f"{len(data)=}") xrangeRDD = sc.parallelize(data, 4) print(f"{xrangeRDD}") ``` 1 len(data)=30 PythonRDD[1] at RDD at PythonRDD.scala:53 ### Transformacions d'RDDs Una transformació és una operació sobre un RDD que genera un nou RDD. Les transformacions són executades de manera '_lazy_' (mandrosa), que vol dir que no s'avaluen fins que no són requerides per una operació. Per exemple, podem generar un nou RDD a partir del nostre RDD anterior aplicant una transformació que duplica cada membre del nostre RDD original: ```python doblaRDD = xrangeRDD.map(lambda x: x*2) filteredRDD = doblaRDD.filter(lambda x : x<10) print(filteredRDD.collect()) print(f"{filteredRDD.count()=}") ``` [Stage 0:> (0 + 4) / 4] [2, 4, 6, 8] filteredRDD.count()=4 ### Desant els resultats en caché ```python import time test = sc.parallelize(range(1,500000), 4) test.cache() t1 = time.time() # first count will trigger evaluation of count *and* cache count1 = test.count() dt1 = time.time() - t1 print("dt1: ", dt1) t2 = time.time() # second count operates on cached data only count2 = test.count() dt2 = time.time() - t2 print("dt2: ", dt2) ``` dt1: 0.48540806770324707 dt2: 0.17197632789611816 ### Un Dataframe a partir de dades reals en format JSON En primer lloc, crearem el nostre fitxer de dades JSON: ```python import json alumnes = [{"nom": "Jaume", "cicle":"DAM"},{"nom": "Alicia", "cicle":"DAW"},{"nom": "Eudald", "cicle":"ASIX"},{"nom": "Robert", "cicle":"DAM"},{"nom": "Elvira", "cicle":"DAW"}] with open("alumnes.json", "w") as fitxer: fitxer.write(json.dumps(alumnes)) ``` ```python # Read the dataset into a spark dataframe using the `read.json()` function df = spark.read.json("alumnes.json").cache() df.show() df.printSchema() ``` +-----+------+ |cicle| nom| +-----+------+ | DAM| Jaume| | DAW|Alicia| | ASIX|Eudald| | DAM|Robert| | DAW|Elvira| +-----+------+ root |-- cicle: string (nullable = true) |-- nom: string (nullable = true) ### Explorant les dades: ```python # Register the DataFrame as a SQL temporary view df.createTempView("alumnes") df.select("nom").show() df.select(df["nom"]).show() spark.sql("SELECT nom FROM alumnes").show() ``` +------+ | nom| +------+ | Jaume| |Alicia| |Eudald| |Robert| |Elvira| +------+ +------+ | nom| +------+ | Jaume| |Alicia| |Eudald| |Robert| |Elvira| +------+ +------+ | nom| +------+ | Jaume| |Alicia| |Eudald| |Robert| |Elvira| +------+ ### Consultes simples de dades agregades. ```python spark.sql("Select cicle, count(cicle) as count from alumnes group by cicle").show() ``` +-----+-----+ |cicle|count| +-----+-----+ | DAW| 2| | DAM| 2| | ASIX| 1| +-----+-----+ Ja només ens queda tancar la sessió que hem obert al començament: ```python spark.stop() ```