---
title: Big Data. 006. RDDs i Hadoop Spark SQL utilitzant Python.
tags: DAM, Big Data
---
[Link en MarkDown](https://hackmd.io/@JdaXaviQ/HkfrRD2Es)
# DAM. Big Data 006. RDDs i Hadoop Spark SQL utilitzant Python

## L'API Python d'Spark
Tot i que Spark està escrit en Scala que compila a JVM podem utilitzar-lo emprant Python a travès de biblioteques especialitzades.
Necessitarem instal·lar al nostre sistema els paquets __pyspark__ i __findspark__.
```python
#Instal·lació al nostre sistema operatiu dels paquets Python necessris.
!pip3 install pyspark
!pip3 install findspark
```
Defaulting to user installation because normal site-packages is not writeable
Requirement already satisfied: pyspark in /home/xavi/.local/lib/python3.10/site-packages (3.3.1)
Requirement already satisfied: py4j==0.10.9.5 in /home/xavi/.local/lib/python3.10/site-packages (from pyspark) (0.10.9.5)
Defaulting to user installation because normal site-packages is not writeable
Requirement already satisfied: findspark in /home/xavi/.local/lib/python3.10/site-packages (2.0.1)
Ara ja podem escriure el nostre codi Python
```python
import findspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
# Inicialitzem Spark
findspark.init()
# Generem el context
sc = SparkContext()
# Creating a spark session
spark = SparkSession \
.builder \
.appName("DAM-M15. Python Spark DataFrames: ús bàsic") \
.config("spark.opció.configuració", "valor de configuració") \
.getOrCreate()
```
22/10/30 22:34:31 WARN Utils: Your hostname, xavi-portatil resolves to a loopback address: 127.0.1.1; using 172.18.42.211 instead (on interface wlo1)
22/10/30 22:34:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/30 22:34:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
```python
print(spark)
```
<pyspark.sql.session.SparkSession object at 0x7f26769511b0>
## Resilient Distributed Datasets o RDDs
RDDS (Resilient Distributed Datasets) són una abstracció de dades que Spark és capaç de generar a partir d'informació desada en sistemes de fitxers distribuïts tipus HDFS. Aquests tipus de dades tenen estructura tabular i permeten consultes tipus SQL.
### Creació del nostre primer RDDs
Amb les següents sentències, podem generar un RDDs que conté els 30 primers nombres naturals:
```python
data = range(1,31)
# print first element of iterator
print(data[0])
print(f"{len(data)=}")
xrangeRDD = sc.parallelize(data, 4)
print(f"{xrangeRDD}")
```
1
len(data)=30
PythonRDD[1] at RDD at PythonRDD.scala:53
### Transformacions d'RDDs
Una transformació és una operació sobre un RDD que genera un nou RDD. Les transformacions són executades de manera '_lazy_' (mandrosa), que vol dir que no s'avaluen fins que no són requerides per una operació.
Per exemple, podem generar un nou RDD a partir del nostre RDD anterior aplicant una transformació que duplica cada membre del nostre RDD original:
```python
doblaRDD = xrangeRDD.map(lambda x: x*2)
filteredRDD = doblaRDD.filter(lambda x : x<10)
print(filteredRDD.collect())
print(f"{filteredRDD.count()=}")
```
[Stage 0:> (0 + 4) / 4]
[2, 4, 6, 8]
filteredRDD.count()=4
### Desant els resultats en caché
```python
import time
test = sc.parallelize(range(1,500000), 4)
test.cache()
t1 = time.time()
# first count will trigger evaluation of count *and* cache
count1 = test.count()
dt1 = time.time() - t1
print("dt1: ", dt1)
t2 = time.time()
# second count operates on cached data only
count2 = test.count()
dt2 = time.time() - t2
print("dt2: ", dt2)
```
dt1: 0.48540806770324707
dt2: 0.17197632789611816
### Un Dataframe a partir de dades reals en format JSON
En primer lloc, crearem el nostre fitxer de dades JSON:
```python
import json
alumnes = [{"nom": "Jaume", "cicle":"DAM"},{"nom": "Alicia", "cicle":"DAW"},{"nom": "Eudald", "cicle":"ASIX"},{"nom": "Robert", "cicle":"DAM"},{"nom": "Elvira", "cicle":"DAW"}]
with open("alumnes.json", "w") as fitxer:
fitxer.write(json.dumps(alumnes))
```
```python
# Read the dataset into a spark dataframe using the `read.json()` function
df = spark.read.json("alumnes.json").cache()
df.show()
df.printSchema()
```
+-----+------+
|cicle| nom|
+-----+------+
| DAM| Jaume|
| DAW|Alicia|
| ASIX|Eudald|
| DAM|Robert|
| DAW|Elvira|
+-----+------+
root
|-- cicle: string (nullable = true)
|-- nom: string (nullable = true)
### Explorant les dades:
```python
# Register the DataFrame as a SQL temporary view
df.createTempView("alumnes")
df.select("nom").show()
df.select(df["nom"]).show()
spark.sql("SELECT nom FROM alumnes").show()
```
+------+
| nom|
+------+
| Jaume|
|Alicia|
|Eudald|
|Robert|
|Elvira|
+------+
+------+
| nom|
+------+
| Jaume|
|Alicia|
|Eudald|
|Robert|
|Elvira|
+------+
+------+
| nom|
+------+
| Jaume|
|Alicia|
|Eudald|
|Robert|
|Elvira|
+------+
### Consultes simples de dades agregades.
```python
spark.sql("Select cicle, count(cicle) as count from alumnes group by cicle").show()
```
+-----+-----+
|cicle|count|
+-----+-----+
| DAW| 2|
| DAM| 2|
| ASIX| 1|
+-----+-----+
Ja només ens queda tancar la sessió que hem obert al començament:
```python
spark.stop()
```