# DSCI - 551 Lecture Notes - 3
## Week 11
- ec2
- sudo service mongod start
- start mongod.service
- mongosh
- mongosh
- use dsci555;
- db.students.insert({"name": "john"})
- // collection.insert() is deprecated
- db.students.insertOne({"name": "john"})
- db.students.find()
- return 2 documents
- db.students.insertOne({"name": "john"})
- db.students.find()
- return 3 documents
- **=** select * from students;
- db.students.insertOne({})
- insert empty object
- valid operation
- db.students.find()
- return 4 documents
- every document has a '_id', it's the primary key of students collection.
- db.students.insert([{},{}])
- db.students.find()
- return 6 documents
- db.students.insertMany([{},{}])
- db.students.insertMary([{_id: 1},{_id: 2}])
- db.students.find()
- return 10 documents
- the last two are
- { _id: 1 }
- { _id: 2 }
- db.students.insertOne({"_id": {"uid": 123, "did": 456}})
- db.students.find()
- return 11 documents
- the last one is
- { _id: { uid: 123, did: 456 } }
- the value of '_id' can be anything but should be unique
- db.students.insertOne({_id: 3, name: 'bill'})
- '_id' can be quoted or not
- value can be double quote or single quote
- db.students.find()
- return 12 documents
- db.students.insertOne({_id: 4, full.name: 'bill smith'})
- SynataxError
- key is unnecessay to quote unless if contains any special character.
- db.students.insertOne({_id: 4, 'full.name': 'bill smith'})
- db.students.find()
- return 13 documents
- db.students.update({_id: 1}, {$set: {age: 25}})
- two parameters in this query
- // { _id: 1, age: 25 }
- = update students set age = 25 where _id = 1;
- update if the attribute exists, otherwise insert it.
- db.students.update({_id: 1}, {$set: {age: 26}})
- really update
- // { _id: 1, age: 27 }
- db.students.update({_id: 1}, {$set: {age: 27, name: 'david'}})
- age is update and name is insert
- much flexible than MySQL
- db.students.update({}, {$set: {age: 25}})
- empty primary key means all
- but this query only updates the first one
- db.students.updateMany({}, {$set: {age: 25}})
- this query will update all documents
- db.students.update({}, {$set: {age: 26}, {mulit: true}})
- update all
- true must be lowercase (from JSON)
- // = update students set age = 26;
- db.students.update({_id: 2}, {$unset: {age: null}})
- unset: remove data
- db.students.find()
- db.students.find({name: 'bill'}, $set: {age: 30})
- db.students.find({age: 27})
- return: [ { _id: 2, age: 27 } ]
- db.students.find({age: {$gt: 27}})
- gt = greater than
- return: [ { _id: 3, name: 'bill', age: 30 } ]
- db.students.find({age: {$gte: 27}})
- gt = greater than or equal to
- return: [ { _id: 2, age: 27 }, { _id: 3, name: 'bill', age: 30 } ]
- db.students.find({age: {$lte: 27}})
- return 11 documents
- db.students.find({age: {$lt: 27}})
- return 10 documents
- db.students.find({age: {$eq: 27}})
- equal to
- db.students.find({age: {$ne: 27}})
- not equal to
- // gt, gte, lt, lte, eq, ne
- db.students.find({name: 'john'})
- return 3 documents
- db.students.find({name: /bill/})
- db.students.udpate({_id: 1, {$set: {name: 'david smith}}})
- db.students.find({name: /smith/})
- db.students.update({name: 'john'}, {$set: {name: 'john smith'}}, {multi: true})
- db.students.find({name: /smith/})
- return 4 documents
- db.students.find({name: /Smith/})
- return 0 documents
- db.students.find({name: /Smith/i})
- return 4 documents
- i: case-insensitive
- = select * from students where name like '%smith';
- In MySQL, it's case-insensitive.
- db.students.find()
- db.students.find({name: /smith/i})
- db.students.find({name: /^david/i})
- find name starts with 'david'
- db.students.find({name: /john$/i})
- find name ends with 'john'
- return 1 document
- db.students.find({name: /john/i})
- return 4 documents
- db.students.find({name: /^john/i})
- return 4 documents
- db.students.find({name: {$not: /john/i}})
- find all documents **whose name doesn't contain 'john'** and **who doesn't have a name attribute**
- return 9 documents
- db.students.find({name: {$not: /john/i, $exists: true}})
- find all documents **whose name doesn't contain 'john'** and it must have a name attribute
- return 2 documents
- db.students.find({\$and: [{age: {\$gt: 25}}, {age: {\$lt: 30}}]})
- db.students.find({\$or: [{age: {$gt: 25}}, {age: {\$lt: 30}}]})
- // $and $or $not $exist
- use dsci551
- show collections
- db.studetns.find()
- use dsci
- show collections
- db.students.find()
- 
- db.students.find({scores: {$in: [2, 3]}})
- return 2 documents
- [
- { _id: 6, scores: [ 3, 2, 3, 4, 5 ] }
- { _id: 7, scores: [ 3, 8, 9 ] }
- ]
- db.students.find({scores: {$all: [2, 3]}})
- return 1 document
- [{ \_id: 6, scores: [ 3, 2, 3, 4, 5 ] }]
- \# projection
- db.students.find({age: {$gt: 20}})
- returns [ { _id: 1, name: 'john', gender: 'M', age: 27 }, { _id: 3, age: 25 } ]
- db.students.find({age: {$gt: 20}}, {name: 1, age: 1})
- returns [ { _id: 1, name: 'john', age: 27 }, { _id: 3, age: 25 } ]
- 'name: 1' means I want this attribute
- db.students.find({age: {$gt: 20}}, {name: 1, age: 1, _id: 0})
- '_id: 0' means exclude this attribute
- db.students.find({age: {$gt: 20}}, {name: 1, age: 0, _id: 0})
- MongoServerError: cannot do exclusion on field age inclusion projection
- cannot exclude listed attributes except '_id'
- db.students.find({age: {$gt: 20}}, {name: 0, age: 0, _id: 0})
- it works
- returns only 'gender' field and ignore all attribute listed with 0
- db.students.find({age: {$gt: 20}}, {name: 0, age: 0, \_id: 1})
- it works
- returns fields of '_id', 'gender'
- db.students.find({age: {$gt: 20}}, {name: 0, age: 0})
- it works
- returns fields of '_id', 'gender'
- db.students.find({age: {$gt: 20}}, {name: 0, age: 1})
- it doesn't work
- db.students.find({age: {$gt: 20}}, {name: 1, age: 0})
- it doesn't work
- db.students.find({age: {$gt: 20}}, {name: 1, age: 1})
- it works
- db.students.find().count()
- return: 11
- db.students.find({age: {$gt: 20}}).count()
- db.students.count({age: {$gt: 20}}, {name: 1, age: 1})
- deprecated: Collection.count()
- return: 2
- // = select count(*) from students where age > 20;
- db.students.find()
- ec2
- unzip country.zip
- mongoimport --file country.json --db world2 --collection country
- mongoimport --file city.json --db world2 --collection city
- mongoimport --file countrylanguage.json --db world2 --collection countrylanguage
- use world2
- show collections
- db.country.find()
- db.country.find().limit(1)
- db.country.find().limit(2)
- db.country.find().limit(1).skip(1)
- return the 2nd document
- db.country.find({}, {code: 1, GNP: 1, \_id: 0}).sort({GNP:1}).limit(5)
- sort by increasing order
- db.country.find({}, {code: 1, GNP: 1, \_id: 0}).sort({GNP:-1}).limit(5)
- sort by decreasing order
- db.country.find({}, {code: 1, GNP: 1, \_id: 0}).sort({GNP:-1}).limit(10)
- db.country.find().limit(1)
- db.country.distinct('Continent')
- db.country.distinct('Continent').length
- db.country.distinct('Continent', {GNP: {\$gt: 10000000}})
- return [] // zero
- db.country.distinct('Continent', {GNP: {\$gt: 1000000}})
- return ['Asia', 'Europe', 'North America']
- two arguments
- the 2nd argument is condition
- // = select distinct continent from country where GNP > 1000000;
- db.country.find({GNP: {\$gt: 1000000}}).distinct('Continent')
- TypeError
- // select continent, max(GNP) from country group by continent
- db.country.aggregate({\$group: {_id: '\$Continent'}})
- return
- [
- { _id: 'Asia'},
- { _id: 'South America' },
- { _id: 'North America' },
- { _id: 'Europe' },
- { _id: 'Oceania' },
- { _id: 'Antarctica' },
- { _id: 'Africa' }
- ]
- db.country.aggregate({\$group: {_id: 'Continent'}})
- return [ { _id: 'Continent'}]
- db.country.aggregate({\$group: {_id: 'Continent', max_gnp: {\$max: '\$GNP'}}})
- return [ { _id: 'Continent', max_gnp: 8510700 } ]
- db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}}})
- return
- [
- { _id: 'Asia', max_gnp: 3787042},
- { _id: 'South America', max_gnp: 776739 },
- { _id: 'North America', max_gnp: 8510700 },
- { _id: 'Europe', max_gnp: 2133367 },
- { _id: 'Oceania', max_gnp: 351182 },
- { _id: 'Antarctica', max_gnp: 0 },
- { _id: 'Africa', max_gnp: 116729 }
- ]
- // = select Continent, max(GNP) from country group by Continent;
- db.country.aggregate({\$group: {_id: '\$Continent', stat: {$max: '\$GNP'}}})
- db.country.aggregate({\$group: {_id: '\$Continent', stat: {$avg: '\$GNP'}}})
- db.country.aggregate({\$group: {_id: '\$Continent', stat: {$sum: '\$GNP'}}})
- db.country.aggregate({\$group: {_id: '\$Continent', stat: {$count: '\$GNP'}}})
- MongoServerError
- db.country.aggregate({\$group: {_id: '\$Continent', stat: {$sum: 1}})
- return the 'count' result
- why??
- 每条记录都按1算
- 这样sum一下当然就是count
- // = select Continent, max(GNP) from country group by Continent having count(*) > 30
- db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {$max: '$GNP'}, cnt: {$sum: 1}}})
- 
- only done the group by part, having part is not yet done
- db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {$sum: 1}}}, {\$match: {cnt: {$gt: 30}}})
- 
- db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {\$sum: 1}}}, {\$match: {cnt: {\$gt: 30}}},{\$sort: {cnt: -1}})
- db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {\$sum: 1}}}, {\$match: {cnt: {\$gt: 30}}}, {\$sort: {cnt: -1}}, {\$limit: 2})
- db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {\$sum:1}}}, {\$match: {cnt: {\$gt: 30}}}, {$sort: {cnt: -1}}, {\$limit: 2}, {\$skip: 1})
- db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {\$sum:1}}}, {\$match: {cnt: {$gt: 30}}}, {\$sort: {cnt: -1}})
- [
{ _id: 'Africa', max_gnp: 116729, cnt: 58 },
{ _id: 'Asia', max_gnp: 3787042, cnt: 51 },
{ _id: 'Europe', max_gnp: 2133367, cnt: 46 },
{ _id: 'North America', max_gnp: 8510700, cnt: 37 }
]
- db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {\$sum:1}}}, {\$match: {cnt: {\$gt: 30}}}, {\$sort: {cnt: -1}}, {\$limit: 1})
- db.country.aggregate({$group: {\_id: '\$Continent', max_gnp: {\$max: '$GNP'}, cnt: {\$sum: 1}}}, {\$match: {cnt: {\$gt: 30}}},{\$sort: {cnt: -1}},{\$limit: 1}, {\$skip: 1})
- get nothing, beacase one documnt return
- db.country.aggregate({$group: {\_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {\$sum: 1}}}, {\$match: {cnt: {$gt: 30}}},{\$sort: {cnt: -1}},{\$limit: 1}, {\$skip: 1}, {\$project: {cnt: 0}})
- value discussion
- sort's value should be either 1 or -1
- project's value should bd either 1 or 0
- db.country.aggregate({\$match: {GNP: {$gt: 10000}}},{\$group: {_id: '\$Continent', max_gnp: {$max: '$GNP'}, cnt: {$sum: 1}}}, {\$match: {cnt: {$gt: 30}}},{\$sort: {cnt: -1}},{\$limit: 1}, {\$skip: 1}, {$project: {cnt: 0}})
- db.country.aggregate({$match: {GNP: {\$gt: 10000}}},{\$group: {_id: '\$Continent', max_gnp: {$max: '$GNP'}, cnt: {\$sum: 1}}}, {\$match: {cnt: {$gt: 30}}},{\$sort: {cnt: -1}},{\$limit: 1}, {\$skip: 1})
- // = select Continent, max(GNP) from country where GNP > 10000 group by Continent having count(*) > 30 order by count(*) desc limit 1 offset 1;
- where follows by group by, follows by having
- JOIN
- db.city.find().limit(1)
- db.country.find().limit(1)
- // = select country.Name, city.Name from country join city on country.Capital = city.ID;
- db.country.aggregate({$lookup: {from: 'city', localField: 'Capital', foreignField: 'ID', as: 'res'}})
- every country join at most one city
- db.country.aggregate({$match: {Code: 'USA'}}, {$lookup: {from: 'city', localField: 'Capital', foreignField: 'ID', as: 'res'}})
- db.country.aggregate({$match: {Code: 'USA'}}, {$lookup: {from: 'city', localField: 'Capital', foreignField: 'ID', as: 'res'}}, {$project: {Name: 1, 'res.Name': 1, _id: 0}})
- return: [{ Name: 'United States', res: [ { Name: 'Washington' } ] }]
# Week 13
- ec2 instance
- sudo service mysqld stop
- cd etc/hadoop
- nano core-site.xml
- comment the added property
- nano hdfs-site.xml
- comment the added property
- cat core-site.xml
- cat hdfs-site.xml
- cd
- ls
- copy WordCount.java to folder ~/dsci-551
- cd dsci-551
- ls
- cat WordCount.java
- extends means?
- subclass extends superclass
- params in the classname
- Mapper <Object, Text, Text, IntWritable>
- Mapper <[Input key], [Input value], [Output key], [Output value]>
- Type in Hadoop
- Text in Hadoop => String in Java
- IntWritable => Integer
- Hadoop used their own defined datatype for the efficiency of Shulffing
- Map task serializes the output and send it to reduce task
- json.dump()
- Reduce task deserialized the received data
- json.load()
- compile MapReduce code
- cat /etc/bashrc
- hadoop com.sun.tools.javac.Main WordCount.java
- ls WordCount*.class
- WordCount$TokenizerMapper.class
- WordCount$IntSumReducer.class
- $: inner class
- jar cf wc.jar WordCount*.class
- cd input/
- ls -l
- hello1.txt, hellow2.txt
- cat hello1.txt
- cat hello2.txt
- cd ..
- hadoop jar wc.jar WordCount input output
- Note
- In this example,
- 2 files in input folder => 2 map tasks
- There are 3 files in the output folder
- _SUCCESS file is a signal
- other 2 files mean 2 partitions. They are from reduce tasks.
- terminal
- cd output
- ls
- cd ..
- cat WordCount.java
- The sample code deleted the following lines:
- job.setCombinerClass(IntSumReducer.class);
- job.setNumReduceTaks(2);
- cat input/hello1.txt
- cat input/hello2.txt
- cat output/part-r-00000
- cat output/part-r-00001
- output files are sorted
- What happened if we only have one reduce task?
- nano WordCount.java
- comment the following line
- // job.setNumReduceTask2(2);
- history
- !1014
- hadoop com.sun.tools.javac.Main WordCount.java
- !jar
- jar cf wc.jar WordCount*.class
- hadoop jar wc.jar WordCount input output-r1
- cd output-r1
- ls
- only 2 files
- part-r-00000 _SUCCESS
- cat part-r-00000
- data is sorted by key
- input & output
- cd ..
- cd input/
- ls
- cat hello1.txt
- for every line, you create input k,v pairs here
- key is the byte offset of the line
- value is the content of a line
- cd ..
- cat WordCount.java
- ls input/
- 2 files
- nano input/hello1.txt
- For input of the map task, there will be 3 k-v pairs. Each presents a line.
- for the content like this
- hello world
- hello this world
- hello that world
- <0, 'hello world'>
- <12, 'hello this world'>
- <29, 'hello that world'>
- cat WordCount.java
- 
# Week 14
- ec2
- cd dsci-551
- cd input
- ls -l
- cat hello1.txt
- content
- hello world
- hello this world
- hello that world
- cd ..
- cat WordCount.java
- 3 input k-v pairs for the TokenizerMapper
- corresponding with 3 lines
- (0, 'hello world')
- (12, 'hello this world')
- (29, 'hello that world')
- for the map(Object key, Text, value, Context context)
- input
- key: 0, value: hello world
- key: 1, value: hello this world
- key: 2, value: hello that world
- output
- (hello, 1) (world, 1)
- (hello, 1) (this, 1) (world, 1)
- (hello, 1) (that, 1) (world, 1)
- Then we the output group by key
- (hello, [1, 1, 1]) (world, [1, 1, 1]) (this, [1]) (that, [1])
- for the reduce(Text, key, Iterable<IntWritable> values, Context context)
- input
- (hello, [1, 1, 1])
- (world, [1, 1, 1])
- (this, [1])
- (that, [1])
- Input is handled by FileInputFormat
- Output is handled by FileOutputFormat
- cat SQL2MR.java
- hadoop com.sun.tools.javac.Main WordCount.java
- jar cf wc.jar WordCount*.class
- ls output*
- rm -rf output*
- jar cf wc.jar WordCount input output
- cd output/
- ls
- cd ..
- cat input/hello1.txt
- cat input/hello2.txt
- cat output/part-r-00000
- cat WordCount.java
- In this example, the combiner and reducer use the same class.
- Combiner is a local reduction.
- Locally group 3 * (hello, 1) by key to (hello, 3)
- Combiner runs on the machine running Map tasks.
- EC2 - Spark
- pyspark
- **country = spark.read.json('country.json')**
- country.show(2)
- country.limit(2).show()
- country.select('Continent').show(2)
- country.select('Continent').count()
- country.select('Continent').distinct().show()
- country.where('GNP > 10000').select('Continent').distinct().show()
- country.where('GNP > 10000').select('Continent').distinct().orderBy('Continent', ascending=False).show()
- country.groupBy('Contint').max('GNP').show()
- import pyspark.sql.functions as fc
- country.groupBy('Contint').agg(fc.max('GNP')).show()
- country.groupBy('Contint').agg(fc.max('GNP').alias('max_gnp'), fc.count(*).alias('cnt')).show()
- select Continent, max(GNP) max_gnp, count(*) cnt from country group by Continent
- country.createOrReplaceTempView('country')
- spark.sql('select Continent, max(GNP) max_gnp, count(*) cnt from country group by Continent').show()
- = country.groupBy('Contint').agg(fc.max('GNP').alias('max_gnp'), fc.count(*).alias('cnt')).show()
- spark.sql('select Continent, max(GNP) max_gnp, count(*) cnt from country group by Continent having cnt > 5').show()
- = country.groupBy('Continent').agg(fc.max('GNP').alias('max_gnp'), fc.count('*').alias('cnt')).filter('cnt > 5').show()
- **city = spark.read.json('city.json')**
- city.show(2)
- city.createOrReplaceTempView('city')
- spark.sql('select country country.Name, city.Name from country, city where country.Capital = City.ID').show()
- spark.sql('select country country.Name, city.Name from country, city where country.Capital = City.ID').count()
- 232
- country.filter('Capital = 0').show()
- country.join(city, country.Capital == city.ID).count()
- 232
- country.join(city, country.Capital == city.ID, how='left').count()
- 239
- country.join(city, country.Capital == city.ID, how='right').count()
- 4079
- city.count()
- 4079
- country.join(city, country.Capital == city.ID, how='full').count()
- 4086
- **cl.spark.read.json('countrylanguage.json')**
- cl.show(2)
- cl.filter('CountryCode="USA"').show()
- usa = cl.filter('CountryCode="USA"').select('Language').show()
- can = cl.filter('CountryCode="CAN"').select('Language').show
- usa.intersect(can).show()
- usa.intersectAll(can).show()
- usa.unionAll(can).count()
- 24
- usa.union(can).count()
- 24
- union doesn't remove duplicates
- usa.union(can).distinct().count()
- remove duplicates in this way
- usa.show()
- can.show()
- usa.subtract(can).show()
- Spark
- spark
- sc
- data = sc.parallelize([2,2,3,5,4,8,10], 2)
- def printf(p)
- prnt(list(p))
- data.foreachPartition(printf)
- [2, 2, 3]
- [5, 4, 8, 10]
- data.getNumParitions()
- 2
- data.collect()
- [2, 2, 3, 5, 4, 8. 10]
- data.take(2)
- [2, 2]
- data.count()
- 7
- data.sum()
- 34
- data.max()
- 10
- data.in()
- 2
- data.mean()
- 4.85...
- data.reduce(lambda U, x: U + x)
- 34
- import functools as fl
- fl.reduce
- <built-in function reduce>
- def add(U, x):
- return U + x
- fl.reduce(add, [2, 2, 3])
- 7
- fl.reduce(add, [5, 4, 8, 10])
- 27
- fl.reduce(add, [7, 27])
- 34
- data.sum()
- = fl.reduce(add)
- 34
- data.max()
- = data.reduce(max)
- 10
- data.min()
- = data.reduce(min)
- 2
- data.count()
- 7
- the reduce func can only take one func as param
- for the count() feature, we need 2 func in reduce for count purpose
- one for count, the other for sum
- = data.aggregate( xx, xx, xx )
- the data.aggregate(0, combFunc, redFunc) have 3 params
- 0: initial value used for both combiner and reducer
- combFunc: combine function
- redFunc: reducer function
- fl.reduce(lambda U, x: combFunc, [2, 2, 3], 0)
- fl.reduce(lambda U, x: U + 1, [2, 2, 3], 0)
- 3
- equals
- U = 0
- for x in [2, 2, 3]:
- U = U + 1
- def combFunc(U, x):
- return U + 1
- fl.reduce(lambda U, x: U + 1, [2, 2, 3], 0)
- 3
- fl.reduce(lambda U, x: U + 1, [5, 4, 8, 10], 0)
- 4
- fl.reduce(lambda U, x: U + x, [3, 4], 0)
- 7
- def redFunc(U, x):
- return U + x
- data.aggregate(0, combFunc, redFunc)
- 7
- data.mean()
- 4.8...
- data.aggregate((0.0), lambda U, x: (U[0] + x, U[1] + 1), lambda U, V: U[0] + V[0], U[1], V[1]))
- (34, 7)
- res = data.aggregate((0.0), lambda U, x: (U[0] + x, U[1] + 1), lambda U, V: U[0] + V[0], U[1], V[1]))
- res[0] / res[1]
- 4.8 ...
- lines = sc.textFile('input/hello1.txt', 2)
- lines.collect()
- def printf(p):
- print(list(p))
- lines.foreachPartition(printf)
- ['hello world', 'hello this world']
- ['hello that world']
- line.map(lambda s: s.split()).collect()
- [['hello', 'world'], ['hello', 'this', 'world'], ['hello', 'that', 'world']]
- line.flatMap(lambda s: s.split()).collect()
- ['hello', 'world', 'hello', 'this', 'world', 'hello', 'that', 'world']
- data = line.map(lambda s: s.split()).collect()
- data.foreachPartition(printf)
- [['hello', 'world'], ['hello', 'this', 'world']]
- ['hello', 'that', 'world']
- data = line.flatMap(lambda s: s.split()).collect()
- data.foreachPartition(printf)
- ['hello', 'world', 'hello', 'this', 'world']
- ['hello', 'that', 'world']
- Note
- Q: Do map() and flatMap() data shuffling.
- A: No shuffling in map() and flatMap()。
- ec2
- data = lines.flatMap(lambda s: s.split())
- data.collect()
- ['hello', 'world', 'hello', 'this', 'world', 'hello', 'that', 'world']
- data.map(lambda t: (t, 1)).collect()
- [('hello', 1), ('world', 1), ('hello', 1), ('this', 1), ('world', 1), ('hello', 1), ('that', 1), ('world', 1)]
- data.map(lambda t: (t, 1)).groupByKey().collect()
- note: value is <object>
- data.map(lambda t: (t, 1)).groupByKey().mapValues(list).collect()
- [('world', [1, 1, 1]), ('this', [1]), ('hello', [1, 1, 1]), ('that', [1])]
- data.map(lambda t: (t, 1)).groupByKey().map(lambda kv: (kv[0], list(kv[1]))).collect()
- [('world', [1, 1, 1]), ('this', [1]), ('hello', [1, 1, 1]), ('that', [1])]
- data.map(lambda t: (t, 1)).reduceByKey(lambda U, x: U + x).collect()
- [('world', 3), ('this', 1), ('hello', 3), ('that', 1)]
- data.map(lambda t: (t, 1)).reduceByKey(lambda U, x: U + x).foreachPartition(printf)
- [('world', 3), ('this', 1)]
- [('hello', 3), ('that', 1)]
- lines.foreachPartition(printf)
- ['hello world', 'hello this world']
- ['hello that world']
- ec2
- country = spark.read.json('country.json')
- country_rdd = country.rdd
- country_rdd
- MapPartitionsRDD[51] at javaToPython at NativeMethodAccessorImp.java:0
- country_rdd.take(2)
- country_rdd.collect()
- country_rdd.count()
- 239
- country_rdd.take(1)
- country_rdd.map(lambda r: r['GNP']).take(2)
- country_rdd.map(lambda r: r['GNP'] > 10000).take(2)
- Find the maximum GNP in the continent.
- country_rdd.map(lambda r: (r['Continent'], r['GNP']).collect()
- country_rdd.map(lambda r: (r['Continent'], r['GNP'])).reduceByKey(lambda U, x: max(U, x)).collect()
- [('North America', 8510700.0), ('Asia', 3787042.0), ('Africa', 116729.0), ('Europe', 2133367.0), ('South America', 776739.0), ('Oceania', 351182.0), ('Antarctica', 0.0)]
- country_rdd.map(lambda r: r['Continent']).collect()
- country_rdd.map(lambda r: r['Continent']).distinct().collect()
- country_rdd.take(2)
- country_rdd.map(lambda r: (r['Continent'], 'GNP')).collect()
- country_rdd.map(lambda r: (r['Continent'], r['GNP'])).sortByKey(False).take(10)
- country_rdd.map(lambda r: (r['Continent'], 'GNP')).sortByKey().take(10)
- JOIN
- rdd1 = sc.parallelize([(1, 2), (1, 3), (2, 4)], 2)
- rdd2 = sc.parallelize([(1, 2), (3, 4)], 2)
- rdd1.join(rdd2).collect()
- [(1, (2, 2)), (1, (3, 2))]
- \# inner join
- rdd1.leftOuterJoin(rdd2).collect()
- [(1, (2, 2)), (1, (3, 2)), (2, (4, None))]
- rdd1.rightOuterJoin(rdd2).collect()
- [(1, (2, 2)), (1, (3, 2)), (3, (None, 4))]
- \# (k, v) (k, w) => (k, (v, w))
- Union & Intersect & Subtract
- rdd1.union(rdd2).collect()
- [(1, 2), (1, 3), (2, 4), (1, 2), (3, 4)]
- rdd1.union(rdd2).foreachPartition(prinf)
- [(1, 2)]
- [(1, 3), (2, 4)]
- [(1, 2)]
- [(3, 4)]
- rdd1.foreachPartition(prinf)
- [(1, 2)]
- [(1, 3), (2, 4)]
- rdd2.foreachPartition(prinf)
- [(1, 2)]
- [(3, 4)]
- rdd1.intersection(rdd2).collect()
- rdd1.subtract(rdd2).collect()
- rdd1 = sc.parallelize([(1, 2), (1, 3), (2, 4), (2, 4)], 2)
- rdd1.subtract(rdd2).collect()
- [(2, 4), (2, 4), (1, 3)]