所謂 Coroutine 就是一個可以暫停將執行權讓給其他 Coroutine 或 Awaitables obj 的函數,等其執行完後再繼續執行,並可以多次的進行這樣的暫停與繼續
async / await 是 Python 3.5+ 之後出現的語法糖,讓 Coroutine 與 Coroutine 之間的調度更加清楚
簡單來說:
使用 async 用來宣告一個 native Coroutine
async def read_data(db):
pass
使用 await 讓 Coroutine 掛起
async def read_data(db):
data = await db.fetch("SELECT ...")
有三種主要類型:coroutines、 Tasks 、Futures
Python 3.5
asyncio.get_event_loop()
先建立一個 event_loop,然後再將 Coroutine 放進 run_until_complete()
裡面,直到所有 Coroutine 運行結束。"""
python 3.5 寫法
"""
import asyncio
async def hello_world(x):
print("hello_world x" + str(x))
await asyncio.sleep(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world(3))
loop.close()
Python 3.7之後(更簡潔)
asyncio.run()
一行程式就結束,不用在建立 event_loop 結束時也不需要 loop.close,因為他都幫你做完了import asyncio
async def hello_world(x):
print(“hello_world x” + str(x))
await asyncio.sleep(x)
asyncio.run(hello_world(2))
建立任務方法有兩種
# In Python 3.7+
task = asyncio.create_task(main())
# This works in all Python versions but is less readable
task = asyncio.ensure_future(main())
完整範例
import asyncio
import time
async def dosomething(num):
print(‘start{}’.format(num))
await asyncio.sleep(num)
print(‘sleep{}’.format(num))
async def main():
task1 = asyncio.create_task(dosomething(1))
task2 = asyncio.create_task(dosomething(2))
task3 = asyncio.create_task(dosomething(3))
await task1
await task2
await task3
if __name__ == “__main__”:
time_start = time.time()
asyncio.run(main())
print(time.time() - time_start)
# >> start1
# >> start2
# >> start3
# >> sleep1
# >> sleep2
# >> sleep3
# >> 3.0052239894866943
使用 asyncio.gather( )
,可同時放入多個 Coroutines 或 awaitable object 進入事件循環 (event loop),等 Coroutines 都結束後,並依序收集其回傳值
asyncio.gather( *aws, loop=None, return_exceptions=False)
到目前為止你已經可以運行多個協程任務
pip install gevent
Gevent 是一種基於協程的 Python 網路庫,它用到 Greenlet 提供的,封裝了 libevent 事件迴圈的高層同步 API。它讓開發者在不改變程式設計習慣的同時,用同步的方式寫非同步 I/O 的程式碼
它的協程是基於 greenlet 的
使用 Gevent 的效能確實要比用傳統的執行緒高,甚至高很多
spawn(function, *args, **kwargs) # → Greenlet
創建一個新的 Greenlet 對象,安排它運行函數 funtion(*args, **kwargs)
import gevent
def test1():
print("12")
gevent.sleep(0)
print("34")
def test2():
print("56")
gevent.sleep(0)
print("78")
gevent.joinall([
gevent.spawn(test1),
gevent.spawn(test2),
])
gevent.spawn()
方法會創建一個新的 greenlet 協程對象,並運行它。 gevent.joinall()
方法會等待所有傳入的 greenlet 協程運行結束後再退出,這個方法可以接受一個 timeout 參數來設置超時時間,單位是秒
執行順序如下:
gevent.sleep(0)
時,test1 被阻塞,自動切換到協程 test2,打印 56greenlet 一個協程運行完後,必須顯式切換,不然會返回其父協程
而在 gevent 中,一個協程運行完後,它會自動調度那些未完成的協程
import gevent
import socket
urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=5)
print([job.value for job in jobs])
我們通過協程分別獲取三個網站的 IP 地址,由於打開遠程地址會引起 IO 阻塞,所以 gevent 會自動調度不同的協程。另外,我們可以通過協程對象的 value
屬性,來獲取協程函數的返回值
joinall(greenlets, timeout=None, raise_error=False, count=None)
参数:
greenlets
一系列greenlets去等待。
timeout
如果给出,最大的等待秒数
返回:
在 timeout 結束前一系列已經結束的 greenlets
並行的核心思想是:可以將大的任務分割成一系列的子任務,這些子任務可以同時或者異步執行,而不是每次只執行一個或者同步執行。
細心的朋友們在運行上面例子時會發現,其實程序運行的時間同不用協程是一樣的,是三個網站打開時間的總和
可是理論上協程是非阻塞的,那運行時間應該等於最長的那個網站打開時間呀?
其實這是因為 Python 標準庫裡的 socket 是阻塞式的,DNS 解析無法並發,包括像 urllib 庫也一樣,所以這種情況下用協程完全沒意義。那怎麼辦?
一種方法是使用 gevent 下的 socket 模塊,我們可以通過 from gevent import socket
來導入
不過更常用的方法是使用猴子補丁(Monkey patching)
from gevent import monkey; monkey.patch_socket()
import gevent
import socket
urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=5)
print([job.value for job in jobs])
上述代碼的第一行就是對 socket 標準庫打上猴子補丁,此後 socket 標準庫中的類和方法都會被替換成非阻塞式的,所有其他的代碼都不用修改,這樣協程的效率就真正體現出來了。 Python 中其它標準庫也存在阻塞的情況,gevent 提供了 monkey.patch_all()
方法將所有標準庫都替換。
from gevent import monkey; monkey.patch_all()
使用猴子補丁褒貶不一,但是官網上還是建議使用 patch_all()
,而且在程序的第一行就執行
協程狀態有已啟動和已停止,分別可以用協程對象的 started
屬性和 ready()
方法來判斷
對於已停止的協程,可以用 successful()
方法來判斷其是否成功運行且沒拋異常。如果協程執行完有返回值,可以通過 value
屬性來獲取
另外,greenlet 協程運行過程中發生的異常是不會被拋出到協程外的,因此需要用協程對象的 exception
屬性來獲取協程中的異常
下面的例子很好的演示了各種方法和屬性的使用
#coding:utf8
import gevent
def win():
return 'You win!'
def fail():
raise Exception('You failed!')
winner = gevent.spawn(win)
loser = gevent.spawn(fail)
print(winner.started) # True
print(loser.started) # True
# 在Greenlet中发生的异常,不会被抛到Greenlet外面。
# 控制台会打出Stacktrace,但程序不会停止
try:
gevent.joinall([winner, loser])
except Exception as e:
# 这段永远不会被执行
print('This will never be reached')
print(winner.ready()) # True
print(loser.ready()) # True
print(winner.value) # 'You win!'
print(loser.value) # None
print(winner.successful()) # True
print(loser.successful()) # False
# 这里可以通过raise loser.exception 或 loser.get()
# 来将协程中的异常抛出
print(loser.exception)
之前我們講過在 gevent.joinall()
方法中可以傳入 timeout
參數來設置超時,我們也可以在全局範圍內設置超時時間:
import gevent
from gevent import Timeout
timeout = Timeout(2) # 2 seconds
timeout.start()
def wait():
gevent.sleep(10)
try:
gevent.spawn(wait).join()
except Timeout:
print('Could not complete')
上例中,我們將超時設為2秒,此後所有協程的運行,如果超過兩秒就會拋出 Timeout 異常
我們也可以將超時設置在with語句內,這樣該設置只在with語句塊中有效:
with Timeout(1):
gevent.sleep(10)
此外,我們可以指定超時所拋出的異常,來替換默認的Timeout異常。比如下例中超時就會拋出我們自定義的TooLong異常
class TooLong(Exception):
pass
with Timeout(1, TooLong):
gevent.sleep(10)