# Python scraper ###### tags: `notes` https://github.com/REMitchell/python-scraping ### Python虛擬環境 ``` $ virtualenv scrapingEnv $ cd scrapingEnv $ source bin/activate ``` ## WEB CRAWLER ```python from urllib.request import urlopen from urllib.error import HTTPError from bs4 import BeautifulSoup def getTitle(url): try: html = urlopen(url) # 主機找不到此頁面 (e.g. 404 Page Not Found) except HTTPError as e: return None try: bsObj = BeautifulSoup(html.read(), "lxml") title = bsObj.body.h1 # 找不到主機 (e.g. 斷線, 打錯網址) => None Obj except AttributeError as e: return None return title title = getTitle("http://www.pythonscraping.com/pages/page1.html") if title == None: print("Title could not be found") else: print(title) ``` ___ ```python from urllib.request import urlopen from bs4 import BeautifulSoup import re # regular expression html = urlopen('http://www.pythonscraping.com/pages/warandpeace.html') bs = BeautifulSoup(html, "html.parser") # findAll(tag, attr, recursive, text, limit, keywords) nameList = bs.findAll('span', {'class': 'green'}) for name in nameList: print(name.get_text()) # .get_text 去掉標籤 for child in bs.find('table',{'id':'giftList'}).children: # vs. descendants print(child) # 印出 tr, th, td ... # for sibling in bs.table.tr.next_siblings: # 排除標題列 # .previous_siblings, .parent images = bs.find_all('img', {'src':re.compile('\.\.\/img\/gifts/img.*\.jpg')}) for image in images: print(image['src']) # bs.find_all(lambda tag: len(tag.attrs) == 2) ``` ___ ```python # wiki文章頁面的URL不包含冒號,但檔案上傳頁、討論頁有冒號 # python 預設遞迴限制 1000 次 from urllib.request import urlopen from bs4 import BeautifulSoup import re pages = set() def getLinks(pageUrl): global pages html = urlopen('http://en.wikipedia.org{}'.format(pageUrl)) bs = BeautifulSoup(html, 'html.parser') for link in bs.find_all('a', href=re.compile('^(/wiki/)')): if 'href' in link.attrs: if link.attrs['href'] not in pages: #We have encountered a new page newPage = link.attrs['href'] print(newPage) pages.add(newPage) getLinks(newPage) getLinks('') ``` ```python= from urllib.request import urlopen from urllib.parse import urlparse from bs4 import BeautifulSoup import re # urlparse(startingPage).scheme => https # urlparse(startingPage).netloc => www.w3schools.com #Retrieves a list of all Internal links found on a page def getInternalLinks(bs, includeUrl): includeUrl = '{}://{}'.format(urlparse(includeUrl).scheme, urlparse(includeUrl).netloc) internalLinks = [] #Finds all links that begin with a "/" for link in bs.find_all('a', href=re.compile('^(/|.*'+includeUrl+')')): if link.attrs['href'] is not None: if link.attrs['href'] not in internalLinks: if(link.attrs['href'].startswith('/')): internalLinks.append(includeUrl+link.attrs['href']) else: internalLinks.append(link.attrs['href']) return internalLinks #Retrieves a list of all external links found on a page def getExternalLinks(bs, excludeUrl): externalLinks = [] #Finds all links that start with "http" that do #not contain the current URL for link in bs.find_all('a', href=re.compile('^(http|www)((?!'+excludeUrl+').)*$')): if link.attrs['href'] is not None: if link.attrs['href'] not in externalLinks: externalLinks.append(link.attrs['href']) return externalLinks allExtLinks = set() allIntLinks = set() def getAllExternalLinks(siteUrl): html = urlopen(siteUrl) domain = '{}://{}'.format(urlparse(siteUrl).scheme, urlparse(siteUrl).netloc) bs = BeautifulSoup(html, 'html.parser') internalLinks = getInternalLinks(bs, domain) externalLinks = getExternalLinks(bs, domain) for link in externalLinks: if link not in allExtLinks: allExtLinks.add(link) print(link) for link in internalLinks: if link not in allIntLinks: allIntLinks.add(link) getAllExternalLinks(link) allIntLinks.add('http://oreilly.com') getAllExternalLinks('http://oreilly.com') ``` ``` $ scrapy project Name ``` ___ * GET : 透過瀏覽器的位址列前往網站的動作 * POST : 傳送資料給主機上的某個後端程式 * PUT : 更新一個物件或資訊 (大多被 POST 取代) * DELETE : 刪除物件 ```python import json jsonString = '''{ ...... }''' jsonObj = json.loads(jsonString) print(jsonObj.get("...")) ``` ___ ## STORE DATA * 儲存媒體檔參考(URL) -> hotlinking * 儲存媒體檔檔案 ```python import os from urllib.request import urlretrieve from urllib.request import urlopen from bs4 import BeautifulSoup # urllib.request.urlretrieve 從遠端 URL 下載檔案 downloadDirectory = 'downloaded' baseUrl = 'http://pythonscraping.com' def getAbsoluteURL(baseUrl, source): if source.startswith('http://www.'): url = 'http://{}'.format(source[11:]) elif source.startswith('http://'): url = source elif source.startswith('www.'): url = source[4:] url = 'http://{}'.format(source) else: url = '{}/{}'.format(baseUrl, source) if baseUrl not in url: return None return url def getDownloadPath(baseUrl, absoluteUrl, downloadDirectory): path = absoluteUrl.replace('www.', '') path = path.replace(baseUrl, '') path = downloadDirectory+path directory = os.path.dirname(path) if not os.path.exists(directory): os.makedirs(directory) return path html = urlopen('http://www.pythonscraping.com') bs = BeautifulSoup(html, 'html.parser') downloadList = bs.findAll(src=True) for download in downloadList: fileUrl = getAbsoluteURL(baseUrl, download['src']) if fileUrl is not None: print(fileUrl) urlretrieve(fileUrl, getDownloadPath(baseUrl, fileUrl, downloadDirectory)) ``` ```python import pymysql # 需要額外裝(好像沒有 pip install) # MySQL 預設 DATABASE CHARACTER SET, TABLE CHARACTER SET, VARCHAR 為 utf8mb4 -> ALTER utf8mb4_unicode_ci conn = pymysql.connect(host='127.0.0.1', unix_socket='/tmp/mysql.sock', user='root', passwd='root', db='mysql', charset='utf8') cur = conn.cursor() cur.execute('USE scraping') cur.execute('SELECT * FROM ... WHERE ...') print(cur.fetchone()) try: cur.execute("INSERT INTO ... VALUES ...") cur.connection.commit() finally: cur.close() conn.close() ``` ## MAIL ```python import smtplib from email.mime.text import MIMEText msg = MIMEText('The body of the email is here') msg['Subject'] = 'An Email Alert' msg['From'] = 'ryan@pythonscraping.com' msg['To'] = 'webmaster@pythonscraping.com' s = smtplib.SMTP('localhost') s.send_message(msg) s.quit() ``` ___ ## ENCODING * ASCII : '0' + 7-bit * UTF-8 : 表現一個字元至少 8 bit,最多 4 bytes * '0' + 7-bit : 與 ASCII 表達字元相同 * '1' + .... : 超多 ASCII 表達範圍 ```python html = urlopen("http://en.wikipedia.org/wiki/Python_(programming_language)") bs = BeautifulSoup(html, "html.parser") content = bs.find("div", {"id":"mw-content-text"}).get_text() content = bytes(content, "UTF-8") content = content.decode("UTF-8") print(content) # check <meta charset="utf-8"> ``` ## CSV ```python import csv from urllib.request import urlopen from bs4 import BeautifulSoup html = urlopen('http://en.wikipedia.org/wiki/Comparison_of_text_editors') bs = BeautifulSoup(html, 'html.parser') # The main comparison table is currently the first table on the page table = bs.findAll('table',{'class':'wikitable'})[0] rows = table.findAll('tr') csvFile = open('editors.csv', 'wt+') # wt 寫文件時會用 \r\n 表示換行 writer = csv.writer(csvFile) try: for row in rows: csvRow = [] for cell in row.findAll(['td', 'th']): csvRow.append(cell.get_text()) writer.writerow(csvRow) finally: csvFile.close() ``` ```python import csv from urllib.request import urlopen from io import StringIO data = urlopen('http://pythonscraping.com/files/MontyPythonAlbums.csv').read().decode('ascii', 'ignore') dataFile = StringIO(data) # 會印出標題列 csvReader = csv.reader(dataFile) for row in csvReader: print(row) # 標題列不印出 dictReader = csv.DictReader(dataFile) print(dictReader.fieldnames) for row in dictReader: print(row) ``` ___ ## CLEAN DATA https://ithelp.ithome.com.tw/articles/10222163 ```python import re import string content = re.sub('\n|[[\d+\]]', ' ', content) # content = re.sub('\n+', ' ', content) # content = re.sub('[[0-9]*\]', ' ', content) # content = re.sub(' +', ' ', content) content = bytes(content, 'UTF-8') # 消除跳脫字元 content = content.decode('ascii', 'ignore') content = content.split(' ') words=[] for item in content: item = item.strip(string.punctuation + string.whitespace) if len(item) > 1 or (item.lower() == 'a' or item.lower() == 'i'): words.append(item) ``` ``` from collections import OrderedDict from collections import Counter import operator sortedNGrams = sorted(ngrams.items(), key=operator.itemgetter(1), reverse=True) ``` ## FORMS * `<form enctype=" ... ">` * application/x-www-form-urlencoded : 預設,發送前編碼所有字符 * multipart/form-data : 不對字符編碼,在使用包含文件上傳的表單時須使用該值 * text/plain : 空格轉換成 "+" 加號,但不對特殊字符編碼 ```python import requests # 傳送資料的目的地網址 -> <form method="POST" action=" ... " > # 填資料的欄位 -> <input type="text" name=" ... " > params = {'firstname': 'Ryan', 'lastname': 'Mitchell'} r = requests.post("http://pythonscraping.com/pages/processing.php", data=params) print(r.text) # 填資料的欄位 -> <input type="file" name=" ... " > files = {'uploadFile': open('files/Python-logo.png', 'rb')} # 二進位檔案讀取 r = requests.post('http://pythonscraping.com/pages/processing2.php', files=files) print(r.text) ``` ```python import requests params = {'username': 'Ryan', 'password': 'password'} r = requests.post('http://pythonscraping.com/pages/cookies/welcome.php', params) print('Cookie is set to:') print(r.cookies.get_dict()) r = requests.get('http://pythonscraping.com/pages/cookies/profile.php', cookies=r.cookies) print(r.text) ``` ```python import requests from requests.auth import AuthBase from requests.auth import HTTPBasicAuth auth = HTTPBasicAuth('ryan', 'password') r = requests.post(url='http://pythonscraping.com/pages/auth/login.php', auth=auth) print(r.text) ``` ___ ## JAVASCRIPT * Ajax : 用來對 web server 傳送或接收資料,但不必為此多請求一個新頁面的技術 * DHTML : 隨著用戶端 script 修改,頁面內容而有所變化 * Selenium + PhantomJS(headless瀏覽器) ```python from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC chrome_options = Options() chrome_options.add_argument("--headless") driver = webdriver.Chrome(executable_path='drivers/chromedriver', options=chrome_options) driver.get('http://pythonscraping.com/pages/javascript/ajaxDemo.html') try: element = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, 'loadedButton'))) finally: print(driver.find_element_by_id('content').text) driver.close() # pageSource = driver.page_source # bsObj = BeautifulSoup(pageSource) # print(bsObj.find(id="content").get_text()) # selector # driver.find_elements_by_id('content').text # locator # driver.find_elements(By.ID, 'content').text # ID, CLASS_NAME, CSS_SELECTOR, LINK_TEXT, PARTIAL_LINK_TEXT, NAME, TAG_NAME, XPATH ``` ```python from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.remote.webelement import WebElement from selenium.common.exceptions import StaleElementReferenceException import time # monitor one DOM element to check whether in this page def waitForLoad(driver): elem = driver.find_element_by_tag_name("html") count = 0 while True: count += 1 if count > 20: print("Timing out after 10 seconds and returning") return time.sleep(.5) try: elem == driver.find_element_by_tag_name("html") except StaleElementReferenceException: return chrome_options = Options() chrome_options.add_argument("--headless") driver = webdriver.Chrome(executable_path='drivers/chromedriver', options=chrome_options) driver.get("http://pythonscraping.com/pages/javascript/redirectDemo1.html") waitForLoad(driver) print(driver.page_source) driver.close() ```