# DSCI - 551 Lecture Notes - 3 ## Week 11 - ec2 - sudo service mongod start - start mongod.service - mongosh - mongosh - use dsci555; - db.students.insert({"name": "john"}) - // collection.insert() is deprecated - db.students.insertOne({"name": "john"}) - db.students.find() - return 2 documents - db.students.insertOne({"name": "john"}) - db.students.find() - return 3 documents - **=** select * from students; - db.students.insertOne({}) - insert empty object - valid operation - db.students.find() - return 4 documents - every document has a '_id', it's the primary key of students collection. - db.students.insert([{},{}]) - db.students.find() - return 6 documents - db.students.insertMany([{},{}]) - db.students.insertMary([{_id: 1},{_id: 2}]) - db.students.find() - return 10 documents - the last two are - { _id: 1 } - { _id: 2 } - db.students.insertOne({"_id": {"uid": 123, "did": 456}}) - db.students.find() - return 11 documents - the last one is - { _id: { uid: 123, did: 456 } } - the value of '_id' can be anything but should be unique - db.students.insertOne({_id: 3, name: 'bill'}) - '_id' can be quoted or not - value can be double quote or single quote - db.students.find() - return 12 documents - db.students.insertOne({_id: 4, full.name: 'bill smith'}) - SynataxError - key is unnecessay to quote unless if contains any special character. - db.students.insertOne({_id: 4, 'full.name': 'bill smith'}) - db.students.find() - return 13 documents - db.students.update({_id: 1}, {$set: {age: 25}}) - two parameters in this query - // { _id: 1, age: 25 } - = update students set age = 25 where _id = 1; - update if the attribute exists, otherwise insert it. - db.students.update({_id: 1}, {$set: {age: 26}}) - really update - // { _id: 1, age: 27 } - db.students.update({_id: 1}, {$set: {age: 27, name: 'david'}}) - age is update and name is insert - much flexible than MySQL - db.students.update({}, {$set: {age: 25}}) - empty primary key means all - but this query only updates the first one - db.students.updateMany({}, {$set: {age: 25}}) - this query will update all documents - db.students.update({}, {$set: {age: 26}, {mulit: true}}) - update all - true must be lowercase (from JSON) - // = update students set age = 26; - db.students.update({_id: 2}, {$unset: {age: null}}) - unset: remove data - db.students.find() - db.students.find({name: 'bill'}, $set: {age: 30}) - db.students.find({age: 27}) - return: [ { _id: 2, age: 27 } ] - db.students.find({age: {$gt: 27}}) - gt = greater than - return: [ { _id: 3, name: 'bill', age: 30 } ] - db.students.find({age: {$gte: 27}}) - gt = greater than or equal to - return: [ { _id: 2, age: 27 }, { _id: 3, name: 'bill', age: 30 } ] - db.students.find({age: {$lte: 27}}) - return 11 documents - db.students.find({age: {$lt: 27}}) - return 10 documents - db.students.find({age: {$eq: 27}}) - equal to - db.students.find({age: {$ne: 27}}) - not equal to - // gt, gte, lt, lte, eq, ne - db.students.find({name: 'john'}) - return 3 documents - db.students.find({name: /bill/}) - db.students.udpate({_id: 1, {$set: {name: 'david smith}}}) - db.students.find({name: /smith/}) - db.students.update({name: 'john'}, {$set: {name: 'john smith'}}, {multi: true}) - db.students.find({name: /smith/}) - return 4 documents - db.students.find({name: /Smith/}) - return 0 documents - db.students.find({name: /Smith/i}) - return 4 documents - i: case-insensitive - = select * from students where name like '%smith'; - In MySQL, it's case-insensitive. - db.students.find() - db.students.find({name: /smith/i}) - db.students.find({name: /^david/i}) - find name starts with 'david' - db.students.find({name: /john$/i}) - find name ends with 'john' - return 1 document - db.students.find({name: /john/i}) - return 4 documents - db.students.find({name: /^john/i}) - return 4 documents - db.students.find({name: {$not: /john/i}}) - find all documents **whose name doesn't contain 'john'** and **who doesn't have a name attribute** - return 9 documents - db.students.find({name: {$not: /john/i, $exists: true}}) - find all documents **whose name doesn't contain 'john'** and it must have a name attribute - return 2 documents - db.students.find({\$and: [{age: {\$gt: 25}}, {age: {\$lt: 30}}]}) - db.students.find({\$or: [{age: {$gt: 25}}, {age: {\$lt: 30}}]}) - // $and $or $not $exist - use dsci551 - show collections - db.studetns.find() - use dsci - show collections - db.students.find() - ![](https://i.imgur.com/WyDJMbM.png) - db.students.find({scores: {$in: [2, 3]}}) - return 2 documents - [ - { _id: 6, scores: [ 3, 2, 3, 4, 5 ] } - { _id: 7, scores: [ 3, 8, 9 ] } - ] - db.students.find({scores: {$all: [2, 3]}}) - return 1 document - [{ \_id: 6, scores: [ 3, 2, 3, 4, 5 ] }] - \# projection - db.students.find({age: {$gt: 20}}) - returns [ { _id: 1, name: 'john', gender: 'M', age: 27 }, { _id: 3, age: 25 } ] - db.students.find({age: {$gt: 20}}, {name: 1, age: 1}) - returns [ { _id: 1, name: 'john', age: 27 }, { _id: 3, age: 25 } ] - 'name: 1' means I want this attribute - db.students.find({age: {$gt: 20}}, {name: 1, age: 1, _id: 0}) - '_id: 0' means exclude this attribute - db.students.find({age: {$gt: 20}}, {name: 1, age: 0, _id: 0}) - MongoServerError: cannot do exclusion on field age inclusion projection - cannot exclude listed attributes except '_id' - db.students.find({age: {$gt: 20}}, {name: 0, age: 0, _id: 0}) - it works - returns only 'gender' field and ignore all attribute listed with 0 - db.students.find({age: {$gt: 20}}, {name: 0, age: 0, \_id: 1}) - it works - returns fields of '_id', 'gender' - db.students.find({age: {$gt: 20}}, {name: 0, age: 0}) - it works - returns fields of '_id', 'gender' - db.students.find({age: {$gt: 20}}, {name: 0, age: 1}) - it doesn't work - db.students.find({age: {$gt: 20}}, {name: 1, age: 0}) - it doesn't work - db.students.find({age: {$gt: 20}}, {name: 1, age: 1}) - it works - db.students.find().count() - return: 11 - db.students.find({age: {$gt: 20}}).count() - db.students.count({age: {$gt: 20}}, {name: 1, age: 1}) - deprecated: Collection.count() - return: 2 - // = select count(*) from students where age > 20; - db.students.find() - ec2 - unzip country.zip - mongoimport --file country.json --db world2 --collection country - mongoimport --file city.json --db world2 --collection city - mongoimport --file countrylanguage.json --db world2 --collection countrylanguage - use world2 - show collections - db.country.find() - db.country.find().limit(1) - db.country.find().limit(2) - db.country.find().limit(1).skip(1) - return the 2nd document - db.country.find({}, {code: 1, GNP: 1, \_id: 0}).sort({GNP:1}).limit(5) - sort by increasing order - db.country.find({}, {code: 1, GNP: 1, \_id: 0}).sort({GNP:-1}).limit(5) - sort by decreasing order - db.country.find({}, {code: 1, GNP: 1, \_id: 0}).sort({GNP:-1}).limit(10) - db.country.find().limit(1) - db.country.distinct('Continent') - db.country.distinct('Continent').length - db.country.distinct('Continent', {GNP: {\$gt: 10000000}}) - return [] // zero - db.country.distinct('Continent', {GNP: {\$gt: 1000000}}) - return ['Asia', 'Europe', 'North America'] - two arguments - the 2nd argument is condition - // = select distinct continent from country where GNP > 1000000; - db.country.find({GNP: {\$gt: 1000000}}).distinct('Continent') - TypeError - // select continent, max(GNP) from country group by continent - db.country.aggregate({\$group: {_id: '\$Continent'}}) - return - [ - { _id: 'Asia'}, - { _id: 'South America' }, - { _id: 'North America' }, - { _id: 'Europe' }, - { _id: 'Oceania' }, - { _id: 'Antarctica' }, - { _id: 'Africa' } - ] - db.country.aggregate({\$group: {_id: 'Continent'}}) - return [ { _id: 'Continent'}] - db.country.aggregate({\$group: {_id: 'Continent', max_gnp: {\$max: '\$GNP'}}}) - return [ { _id: 'Continent', max_gnp: 8510700 } ] - db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}}}) - return - [ - { _id: 'Asia', max_gnp: 3787042}, - { _id: 'South America', max_gnp: 776739 }, - { _id: 'North America', max_gnp: 8510700 }, - { _id: 'Europe', max_gnp: 2133367 }, - { _id: 'Oceania', max_gnp: 351182 }, - { _id: 'Antarctica', max_gnp: 0 }, - { _id: 'Africa', max_gnp: 116729 } - ] - // = select Continent, max(GNP) from country group by Continent; - db.country.aggregate({\$group: {_id: '\$Continent', stat: {$max: '\$GNP'}}}) - db.country.aggregate({\$group: {_id: '\$Continent', stat: {$avg: '\$GNP'}}}) - db.country.aggregate({\$group: {_id: '\$Continent', stat: {$sum: '\$GNP'}}}) - db.country.aggregate({\$group: {_id: '\$Continent', stat: {$count: '\$GNP'}}}) - MongoServerError - db.country.aggregate({\$group: {_id: '\$Continent', stat: {$sum: 1}}) - return the 'count' result - why?? - 每条记录都按1算 - 这样sum一下当然就是count - // = select Continent, max(GNP) from country group by Continent having count(*) > 30 - db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {$max: '$GNP'}, cnt: {$sum: 1}}}) - ![](https://i.imgur.com/A5M3UL3.png) - only done the group by part, having part is not yet done - db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {$sum: 1}}}, {\$match: {cnt: {$gt: 30}}}) - ![](https://i.imgur.com/2E51JOb.png) - db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {\$sum: 1}}}, {\$match: {cnt: {\$gt: 30}}},{\$sort: {cnt: -1}}) - db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {\$sum: 1}}}, {\$match: {cnt: {\$gt: 30}}}, {\$sort: {cnt: -1}}, {\$limit: 2}) - db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {\$sum:1}}}, {\$match: {cnt: {\$gt: 30}}}, {$sort: {cnt: -1}}, {\$limit: 2}, {\$skip: 1}) - db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {\$sum:1}}}, {\$match: {cnt: {$gt: 30}}}, {\$sort: {cnt: -1}}) - [ { _id: 'Africa', max_gnp: 116729, cnt: 58 }, { _id: 'Asia', max_gnp: 3787042, cnt: 51 }, { _id: 'Europe', max_gnp: 2133367, cnt: 46 }, { _id: 'North America', max_gnp: 8510700, cnt: 37 } ] - db.country.aggregate({\$group: {_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {\$sum:1}}}, {\$match: {cnt: {\$gt: 30}}}, {\$sort: {cnt: -1}}, {\$limit: 1}) - db.country.aggregate({$group: {\_id: '\$Continent', max_gnp: {\$max: '$GNP'}, cnt: {\$sum: 1}}}, {\$match: {cnt: {\$gt: 30}}},{\$sort: {cnt: -1}},{\$limit: 1}, {\$skip: 1}) - get nothing, beacase one documnt return - db.country.aggregate({$group: {\_id: '\$Continent', max_gnp: {\$max: '\$GNP'}, cnt: {\$sum: 1}}}, {\$match: {cnt: {$gt: 30}}},{\$sort: {cnt: -1}},{\$limit: 1}, {\$skip: 1}, {\$project: {cnt: 0}}) - value discussion - sort's value should be either 1 or -1 - project's value should bd either 1 or 0 - db.country.aggregate({\$match: {GNP: {$gt: 10000}}},{\$group: {_id: '\$Continent', max_gnp: {$max: '$GNP'}, cnt: {$sum: 1}}}, {\$match: {cnt: {$gt: 30}}},{\$sort: {cnt: -1}},{\$limit: 1}, {\$skip: 1}, {$project: {cnt: 0}}) - db.country.aggregate({$match: {GNP: {\$gt: 10000}}},{\$group: {_id: '\$Continent', max_gnp: {$max: '$GNP'}, cnt: {\$sum: 1}}}, {\$match: {cnt: {$gt: 30}}},{\$sort: {cnt: -1}},{\$limit: 1}, {\$skip: 1}) - // = select Continent, max(GNP) from country where GNP > 10000 group by Continent having count(*) > 30 order by count(*) desc limit 1 offset 1; - where follows by group by, follows by having - JOIN - db.city.find().limit(1) - db.country.find().limit(1) - // = select country.Name, city.Name from country join city on country.Capital = city.ID; - db.country.aggregate({$lookup: {from: 'city', localField: 'Capital', foreignField: 'ID', as: 'res'}}) - every country join at most one city - db.country.aggregate({$match: {Code: 'USA'}}, {$lookup: {from: 'city', localField: 'Capital', foreignField: 'ID', as: 'res'}}) - db.country.aggregate({$match: {Code: 'USA'}}, {$lookup: {from: 'city', localField: 'Capital', foreignField: 'ID', as: 'res'}}, {$project: {Name: 1, 'res.Name': 1, _id: 0}}) - return: [{ Name: 'United States', res: [ { Name: 'Washington' } ] }] # Week 13 - ec2 instance - sudo service mysqld stop - cd etc/hadoop - nano core-site.xml - comment the added property - nano hdfs-site.xml - comment the added property - cat core-site.xml - cat hdfs-site.xml - cd - ls - copy WordCount.java to folder ~/dsci-551 - cd dsci-551 - ls - cat WordCount.java - extends means? - subclass extends superclass - params in the classname - Mapper <Object, Text, Text, IntWritable> - Mapper <[Input key], [Input value], [Output key], [Output value]> - Type in Hadoop - Text in Hadoop => String in Java - IntWritable => Integer - Hadoop used their own defined datatype for the efficiency of Shulffing - Map task serializes the output and send it to reduce task - json.dump() - Reduce task deserialized the received data - json.load() - compile MapReduce code - cat /etc/bashrc - hadoop com.sun.tools.javac.Main WordCount.java - ls WordCount*.class - WordCount$TokenizerMapper.class - WordCount$IntSumReducer.class - $: inner class - jar cf wc.jar WordCount*.class - cd input/ - ls -l - hello1.txt, hellow2.txt - cat hello1.txt - cat hello2.txt - cd .. - hadoop jar wc.jar WordCount input output - Note - In this example, - 2 files in input folder => 2 map tasks - There are 3 files in the output folder - _SUCCESS file is a signal - other 2 files mean 2 partitions. They are from reduce tasks. - terminal - cd output - ls - cd .. - cat WordCount.java - The sample code deleted the following lines: - job.setCombinerClass(IntSumReducer.class); - job.setNumReduceTaks(2); - cat input/hello1.txt - cat input/hello2.txt - cat output/part-r-00000 - cat output/part-r-00001 - output files are sorted - What happened if we only have one reduce task? - nano WordCount.java - comment the following line - // job.setNumReduceTask2(2); - history - !1014 - hadoop com.sun.tools.javac.Main WordCount.java - !jar - jar cf wc.jar WordCount*.class - hadoop jar wc.jar WordCount input output-r1 - cd output-r1 - ls - only 2 files - part-r-00000 _SUCCESS - cat part-r-00000 - data is sorted by key - input & output - cd .. - cd input/ - ls - cat hello1.txt - for every line, you create input k,v pairs here - key is the byte offset of the line - value is the content of a line - cd .. - cat WordCount.java - ls input/ - 2 files - nano input/hello1.txt - For input of the map task, there will be 3 k-v pairs. Each presents a line. - for the content like this - hello world - hello this world - hello that world - <0, 'hello world'> - <12, 'hello this world'> - <29, 'hello that world'> - cat WordCount.java - ![](https://i.imgur.com/1YfIfv3.jpg) # Week 14 - ec2 - cd dsci-551 - cd input - ls -l - cat hello1.txt - content - hello world - hello this world - hello that world - cd .. - cat WordCount.java - 3 input k-v pairs for the TokenizerMapper - corresponding with 3 lines - (0, 'hello world') - (12, 'hello this world') - (29, 'hello that world') - for the map(Object key, Text, value, Context context) - input - key: 0, value: hello world - key: 1, value: hello this world - key: 2, value: hello that world - output - (hello, 1) (world, 1) - (hello, 1) (this, 1) (world, 1) - (hello, 1) (that, 1) (world, 1) - Then we the output group by key - (hello, [1, 1, 1]) (world, [1, 1, 1]) (this, [1]) (that, [1]) - for the reduce(Text, key, Iterable<IntWritable> values, Context context) - input - (hello, [1, 1, 1]) - (world, [1, 1, 1]) - (this, [1]) - (that, [1]) - Input is handled by FileInputFormat - Output is handled by FileOutputFormat - cat SQL2MR.java - hadoop com.sun.tools.javac.Main WordCount.java - jar cf wc.jar WordCount*.class - ls output* - rm -rf output* - jar cf wc.jar WordCount input output - cd output/ - ls - cd .. - cat input/hello1.txt - cat input/hello2.txt - cat output/part-r-00000 - cat WordCount.java - In this example, the combiner and reducer use the same class. - Combiner is a local reduction. - Locally group 3 * (hello, 1) by key to (hello, 3) - Combiner runs on the machine running Map tasks. - EC2 - Spark - pyspark - **country = spark.read.json('country.json')** - country.show(2) - country.limit(2).show() - country.select('Continent').show(2) - country.select('Continent').count() - country.select('Continent').distinct().show() - country.where('GNP > 10000').select('Continent').distinct().show() - country.where('GNP > 10000').select('Continent').distinct().orderBy('Continent', ascending=False).show() - country.groupBy('Contint').max('GNP').show() - import pyspark.sql.functions as fc - country.groupBy('Contint').agg(fc.max('GNP')).show() - country.groupBy('Contint').agg(fc.max('GNP').alias('max_gnp'), fc.count(*).alias('cnt')).show() - select Continent, max(GNP) max_gnp, count(*) cnt from country group by Continent - country.createOrReplaceTempView('country') - spark.sql('select Continent, max(GNP) max_gnp, count(*) cnt from country group by Continent').show() - = country.groupBy('Contint').agg(fc.max('GNP').alias('max_gnp'), fc.count(*).alias('cnt')).show() - spark.sql('select Continent, max(GNP) max_gnp, count(*) cnt from country group by Continent having cnt > 5').show() - = country.groupBy('Continent').agg(fc.max('GNP').alias('max_gnp'), fc.count('*').alias('cnt')).filter('cnt > 5').show() - **city = spark.read.json('city.json')** - city.show(2) - city.createOrReplaceTempView('city') - spark.sql('select country country.Name, city.Name from country, city where country.Capital = City.ID').show() - spark.sql('select country country.Name, city.Name from country, city where country.Capital = City.ID').count() - 232 - country.filter('Capital = 0').show() - country.join(city, country.Capital == city.ID).count() - 232 - country.join(city, country.Capital == city.ID, how='left').count() - 239 - country.join(city, country.Capital == city.ID, how='right').count() - 4079 - city.count() - 4079 - country.join(city, country.Capital == city.ID, how='full').count() - 4086 - **cl.spark.read.json('countrylanguage.json')** - cl.show(2) - cl.filter('CountryCode="USA"').show() - usa = cl.filter('CountryCode="USA"').select('Language').show() - can = cl.filter('CountryCode="CAN"').select('Language').show - usa.intersect(can).show() - usa.intersectAll(can).show() - usa.unionAll(can).count() - 24 - usa.union(can).count() - 24 - union doesn't remove duplicates - usa.union(can).distinct().count() - remove duplicates in this way - usa.show() - can.show() - usa.subtract(can).show() - Spark - spark - sc - data = sc.parallelize([2,2,3,5,4,8,10], 2) - def printf(p) - prnt(list(p)) - data.foreachPartition(printf) - [2, 2, 3] - [5, 4, 8, 10] - data.getNumParitions() - 2 - data.collect() - [2, 2, 3, 5, 4, 8. 10] - data.take(2) - [2, 2] - data.count() - 7 - data.sum() - 34 - data.max() - 10 - data.in() - 2 - data.mean() - 4.85... - data.reduce(lambda U, x: U + x) - 34 - import functools as fl - fl.reduce - <built-in function reduce> - def add(U, x): - return U + x - fl.reduce(add, [2, 2, 3]) - 7 - fl.reduce(add, [5, 4, 8, 10]) - 27 - fl.reduce(add, [7, 27]) - 34 - data.sum() - = fl.reduce(add) - 34 - data.max() - = data.reduce(max) - 10 - data.min() - = data.reduce(min) - 2 - data.count() - 7 - the reduce func can only take one func as param - for the count() feature, we need 2 func in reduce for count purpose - one for count, the other for sum - = data.aggregate( xx, xx, xx ) - the data.aggregate(0, combFunc, redFunc) have 3 params - 0: initial value used for both combiner and reducer - combFunc: combine function - redFunc: reducer function - fl.reduce(lambda U, x: combFunc, [2, 2, 3], 0) - fl.reduce(lambda U, x: U + 1, [2, 2, 3], 0) - 3 - equals - U = 0 - for x in [2, 2, 3]: - U = U + 1 - def combFunc(U, x): - return U + 1 - fl.reduce(lambda U, x: U + 1, [2, 2, 3], 0) - 3 - fl.reduce(lambda U, x: U + 1, [5, 4, 8, 10], 0) - 4 - fl.reduce(lambda U, x: U + x, [3, 4], 0) - 7 - def redFunc(U, x): - return U + x - data.aggregate(0, combFunc, redFunc) - 7 - data.mean() - 4.8... - data.aggregate((0.0), lambda U, x: (U[0] + x, U[1] + 1), lambda U, V: U[0] + V[0], U[1], V[1])) - (34, 7) - res = data.aggregate((0.0), lambda U, x: (U[0] + x, U[1] + 1), lambda U, V: U[0] + V[0], U[1], V[1])) - res[0] / res[1] - 4.8 ... - lines = sc.textFile('input/hello1.txt', 2) - lines.collect() - def printf(p): - print(list(p)) - lines.foreachPartition(printf) - ['hello world', 'hello this world'] - ['hello that world'] - line.map(lambda s: s.split()).collect() - [['hello', 'world'], ['hello', 'this', 'world'], ['hello', 'that', 'world']] - line.flatMap(lambda s: s.split()).collect() - ['hello', 'world', 'hello', 'this', 'world', 'hello', 'that', 'world'] - data = line.map(lambda s: s.split()).collect() - data.foreachPartition(printf) - [['hello', 'world'], ['hello', 'this', 'world']] - ['hello', 'that', 'world'] - data = line.flatMap(lambda s: s.split()).collect() - data.foreachPartition(printf) - ['hello', 'world', 'hello', 'this', 'world'] - ['hello', 'that', 'world'] - Note - Q: Do map() and flatMap() data shuffling. - A: No shuffling in map() and flatMap()。 - ec2 - data = lines.flatMap(lambda s: s.split()) - data.collect() - ['hello', 'world', 'hello', 'this', 'world', 'hello', 'that', 'world'] - data.map(lambda t: (t, 1)).collect() - [('hello', 1), ('world', 1), ('hello', 1), ('this', 1), ('world', 1), ('hello', 1), ('that', 1), ('world', 1)] - data.map(lambda t: (t, 1)).groupByKey().collect() - note: value is <object> - data.map(lambda t: (t, 1)).groupByKey().mapValues(list).collect() - [('world', [1, 1, 1]), ('this', [1]), ('hello', [1, 1, 1]), ('that', [1])] - data.map(lambda t: (t, 1)).groupByKey().map(lambda kv: (kv[0], list(kv[1]))).collect() - [('world', [1, 1, 1]), ('this', [1]), ('hello', [1, 1, 1]), ('that', [1])] - data.map(lambda t: (t, 1)).reduceByKey(lambda U, x: U + x).collect() - [('world', 3), ('this', 1), ('hello', 3), ('that', 1)] - data.map(lambda t: (t, 1)).reduceByKey(lambda U, x: U + x).foreachPartition(printf) - [('world', 3), ('this', 1)] - [('hello', 3), ('that', 1)] - lines.foreachPartition(printf) - ['hello world', 'hello this world'] - ['hello that world'] - ec2 - country = spark.read.json('country.json') - country_rdd = country.rdd - country_rdd - MapPartitionsRDD[51] at javaToPython at NativeMethodAccessorImp.java:0 - country_rdd.take(2) - country_rdd.collect() - country_rdd.count() - 239 - country_rdd.take(1) - country_rdd.map(lambda r: r['GNP']).take(2) - country_rdd.map(lambda r: r['GNP'] > 10000).take(2) - Find the maximum GNP in the continent. - country_rdd.map(lambda r: (r['Continent'], r['GNP']).collect() - country_rdd.map(lambda r: (r['Continent'], r['GNP'])).reduceByKey(lambda U, x: max(U, x)).collect() - [('North America', 8510700.0), ('Asia', 3787042.0), ('Africa', 116729.0), ('Europe', 2133367.0), ('South America', 776739.0), ('Oceania', 351182.0), ('Antarctica', 0.0)] - country_rdd.map(lambda r: r['Continent']).collect() - country_rdd.map(lambda r: r['Continent']).distinct().collect() - country_rdd.take(2) - country_rdd.map(lambda r: (r['Continent'], 'GNP')).collect() - country_rdd.map(lambda r: (r['Continent'], r['GNP'])).sortByKey(False).take(10) - country_rdd.map(lambda r: (r['Continent'], 'GNP')).sortByKey().take(10) - JOIN - rdd1 = sc.parallelize([(1, 2), (1, 3), (2, 4)], 2) - rdd2 = sc.parallelize([(1, 2), (3, 4)], 2) - rdd1.join(rdd2).collect() - [(1, (2, 2)), (1, (3, 2))] - \# inner join - rdd1.leftOuterJoin(rdd2).collect() - [(1, (2, 2)), (1, (3, 2)), (2, (4, None))] - rdd1.rightOuterJoin(rdd2).collect() - [(1, (2, 2)), (1, (3, 2)), (3, (None, 4))] - \# (k, v) (k, w) => (k, (v, w)) - Union & Intersect & Subtract - rdd1.union(rdd2).collect() - [(1, 2), (1, 3), (2, 4), (1, 2), (3, 4)] - rdd1.union(rdd2).foreachPartition(prinf) - [(1, 2)] - [(1, 3), (2, 4)] - [(1, 2)] - [(3, 4)] - rdd1.foreachPartition(prinf) - [(1, 2)] - [(1, 3), (2, 4)] - rdd2.foreachPartition(prinf) - [(1, 2)] - [(3, 4)] - rdd1.intersection(rdd2).collect() - rdd1.subtract(rdd2).collect() - rdd1 = sc.parallelize([(1, 2), (1, 3), (2, 4), (2, 4)], 2) - rdd1.subtract(rdd2).collect() - [(2, 4), (2, 4), (1, 3)]