## async def Serial() await Serial_connection
PyConTW 2019
## <code class="green">感謝大家參與這場 Session</code>
r0: threading, GIL
r2: asyncio 性能分析器
## Who am I?
- Jason
- 軟體工程師?
- Python :heart:
- ~~I use tabs.~~ :cat:
### 演講將分享
+ 如何使用 asyncio 搭建一個 RS232 的序列阜非同步程式經驗
+ 除了使用 protocol 和 transport 還有其他方法?
有人接過 rs232 資料嗎?
寫過 async 程式嗎?
### 緣由
+ 視覺檢測機器產生數據做資料分析
+ RS232 控制與傳輸資料
+ 一台電腦同時接收兩台攝影機的數據
### 序列阜
+ 一種電腦上的通訊接口
+ 常見有 RS232、USB 和 RJ45
序列埠按電氣標準及協定來分,包括RS-232-C、RS-422、RS485、USB等。 RS-232-C、RS-422與RS-485標準只對埠的電氣特性做出規定,不涉及接外掛程式、電纜或協定。USB是近幾年發展起來的新型埠標準,主要應用於高速資料傳輸領域。
+ UART 轉接 RS232
通用非同步收發傳輸器(Universal Asynchronous Receiver/Transmitter,通常稱為UART)是一種異步收發傳輸器,是電腦硬體的一部分,將數據通過串行通信和並行通信間作傳輸轉換。UART通常用在與其他通信接口(如EIA RS-232)的連接上。具體實物表現為獨立的模組化晶片,或是微處理器中的內部周邊裝置(peripheral)。一般和RS-232C規格的,類似Maxim的MAX232之類的標準信號幅度變換晶片進行搭配,作為連接外部設備的接口。
### Producer - comsumer Problem
+ Producer 不斷去讀 serial port 有資料產生就方入 Queue
+ Consumer 檢查 Queue 有資料就執行 API 存入 DB,失敗再放回 Queue 中
digraph {
graph [ fontname="Source Sans Pro", fontsize=20 ];
node [ fontname="Source Sans Pro", fontsize=18];
edge [ fontname="Source Sans Pro", fontsize=12 ];
subgraph core {
c [label="core \nFIFO"] [shape=box]
c -> Queue [ltail=session lhead=session]
subgraph cluster1 {
a [label="Producer\n"] [shape=box]
b [label="Consumer"] [shape=box]
Queue [label="Queue" shape=plaintext ]
b -> Queue [label="push, pull", dir="both"]
a -> Queue [label="push", fontcolor=darkgreen]
概念上不知道資料何時才進,所以想設計一個comsumer-producer mode
### 選擇套件
from serial import Serial
with Serial('/dev/ttyS1', 19200, timeout=1) as ser:
x = ser.read() # read one byte
s = ser.read(10) # read up to ten bytes (timeout)
line = ser.readline() # read a '\n' terminated line
ex. Win 的 port 表達方式 `COM2, COM3`
pyserial 是一個很不錯的套件
### Synchronous 的問題
+ While 迴圈等待資料
+ [多執行緒](https://github.com/chairco/asyncio-pySerial/blob/master/thread.py) -GIL
+ 更潮的做法(2019 算潮嗎?)
例如 time.sleep,或許多執行緒可解決但執行緒是在cpu層的,頻繁 content switch 其實會耗費時間
### Asynchronous 夠潮嗎?
- 非阻塞事件驅動
- 非阻塞式 Coroutine
- asyncio
非同步有很多策略,今天講的是其中兩種,然後會使用 asyncio
非阻塞式 Coroutine
Non-blocking I/O
Callback -> coroutine
Event loop
看了 asyncio 文件,依樣畫葫蘆
def consumer():
with serial.Serial(com, port, timeout=0) as ser:
buffer = ''
while True:
buffer += ser.readline().decode()
if '\r' in buffer:
buf = buffer.split('\r')
last_received, buffer = buffer.split('\r')[-2:]
yield last_received
async def async_producer(session, q):
start = time.time()
while len(q):
data = q.popleft()
status, result = await async_post(session, url_seq, url_films, data)
if status != 201:
if time.time() - start >= 1:
q = deque()
com = 'COM5'
port = 9600
last_received = ''
async def async_main():
global q
async with aiohttp.ClientSession() as session:
for data in consumer():
d = q.popleft()
status, result = await async_post(session, url_seq, url_films, data=d) # post data
if status != 201:
if len(q) >= 2:
await async_producer(session, q)
loop = asyncio.get_event_loop()
### 錯誤在哪裡?
- 會 Blocking
- 不是非同步,是同步
- Event loop, coroutine, Future, Task
- 錯誤[範例](https://github.com/chairco/kaohsiung.py_talk/blob/master/20180918_Data_Engineering_at_Traditional_Factory/async_sample/wrong_async.py)
- 正確[範例](https://github.com/chairco/kaohsiung.py_talk/blob/master/20180918_Data_Engineering_at_Traditional_Factory/async_sample/rignt_async.py)
### 說明
async def async_main():
global q # --> 錯誤 (1)
async with aiohttp.ClientSession() as session:
for data in consumer(): # --> 錯誤 (2)
d = q.popleft()
status, result = await async_post(session, url_seq, url_films, data=d) # post data
if status != 201:
if len(q) >= 2:
await async_producer(session, q)
loop = asyncio.get_event_loop()
### 不如來看看別人怎麼寫的?
- 創造源自於模仿 :bulb:
### PySerial-asyncio :fork_and_knife:
- pyserial 底下的專案
- 支援 3.4 之後
### 快快樂樂非同步,真的很簡單的 :+1:
import asyncio
import serial_asyncio
class Output(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
print('port opened', transport)
transport.serial.rts = False
transport.write(b'hello world\n')
def data_received(self, data):
print('data received', repr(data))
def connection_lost(self, exc):
print('port closed')
loop = asyncio.get_event_loop()
coro = serial_asyncio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
### 有些地方似乎不太明白?
- Output 繼承 asyncio.porotocol?
- create_serial_connection 回傳一個 coroutine?
### <code class="orange">Protocols</code>, <code class="blue">Transports</code>
- [PEP 3156](https://www.python.org/dev/peps/pep-3156/#transports-and-protocols), [PEP 3153](https://www.python.org/dev/peps/pep-3153/)
- low-level api
- callback-based
- should only be used in libraries and frameworks and never in high-level asyncio applications
源自於 PEP 3153, 3156
callback-based api
建議用於函式庫或是框架不要用在 high level 的 application
然後我們來看一下官方文件對於這兩個 api 介紹
### <code class="orange">Protocols</code>, <code class="blue">Transports</code>
- At the highest level, the transport is concerned with how bytes are transmitted, while the protocol determines which bytes to transmit (and to some extent when).
- A different way of saying the same thing: a transport is an abstraction for a socket (or similar I/O endpoint) while a protocol is an abstraction for an application, from the transport’s point of view.
最高層傳輸只關心 怎樣 傳送位元組內容,而協議決定傳送 哪些 位元組內容(還要在一定程度上考慮何時)
從傳輸的角度來看,傳輸是抽像化socket(或類似的I/O endpoint),而協議是抽象化應用程序的。
transport 定義傳輸方式而 protocol 定義要傳輸哪些字元
transport 抽象 i/o endpoint, protocol 抽像應用
### <code class="orange">Protocols</code>, <code class="blue">Transports</code>
- Yet another view is the transport and protocol interfaces together define an abstract interface for using network I/O and interprocess I/O.
- There is always a 1:1 relationship between transport and protocol objects: the protocol calls transport methods to send data, while the transport calls protocol methods to pass it data that has been received.
彼此存在1:1關係,protocol 呼叫 transport 方法傳送資料,transport 則呼叫 protocol 來傳遞已經接收的數據
### <code class="orange">Protocols</code>, <code class="blue">Transports</code>
- Most of connection oriented event loop methods (such as loop.create_connection()) usually accept a protocol_factory argument used to create a Protocol object for an accepted connection, represented by a Transport object. Such methods usually return a tuple of (transport, protocol).
大部分面向連接的事件循環方法(如 loop.create_connection()) 通常接受 protocol_factory 參數為接收到的鏈接創建 p 對象,並用 t 對象來表示。這些方法一般會返回 (transport, protocol) 元組。
### <code class="orange">Protocols</code>
- represent applications such as HTTP client/server, SMTP and FTP
- Async http operation
- loop.create_connection()
### <code class="blue">Transports</code>
- represent connections sush a sockets, SSL connection and pipes
- Async socket operations
- ususally frameworks implements e.g. Tornado
protocol: application 應用程序, 非同步 http, loop.create_connection()
Tranports: socket connection
### Layers
[What Is Async, How Does It Work, And When Should I Use It? -A. Jesse Jiryu Davis](https://www.slideshare.net/emptysquare/what-is-async-how-does-it-work-and-when-should-i-use-it)
用 Jesse 一張很淺顯的圖片,Protocol 就是 websocket, socket 就是 transport
### 進入核心! :100: :muscle: :tada:
有了這個概念之後我們來看看 pyserial_asyncio api
class SerialTransport(asyncio.Transport):
def create_serial_connection(loop, protocol_factory, *args, **kwargs):
def open_serial_connection(*,loop=None,
共有一個 class 兩個 function
### class SerialTransport
- Low level api
- asyncio.Transport 子類別
- 實現非同步第一種方法
- 建立一個 `asyncio.Protocol` 子類別然後使用 `SerialTransport` 作為 `transport`
### <code class="orange">Protocol</code>
- 透過 callbacks 方法告訴 protocol 如何操作
- transport 來處理 callbacks 來回應事件
- connections opened
- data arrives
- 預設的 callbacks 是空值,所以只要複寫一些感興趣的 methods
- connection_made
- connection_lost
- data_received
### API
class SerialTransport(asyncio.Transport):
def __init__(self, loop, protocol, *args, **kwargs):
self._loop = loop
self._protocol = protocol
def get_protocol(self):
return self._protocol
def set_protocol(self, protocol):
return self._protocol
# Implement read/write methods
# [...]
### API
async def ser_create_connection(protocol_factory, *args, **kwargs):
loop = asyncio.get_event_loop()
protocol = protocol_factory()
transport = SerialTransport(loop, protocol, *args, **kwargs)
return transport, protocol
### Protocol 範例(Reader)
import asyncio
import serial_asyncio
import random
from functools import partial
com = '/dev/cu.usbmodem1411' #'/dev/ttys006'
baudrate = 9600
class Reader(asyncio.Protocol):
def __init__(self, queue):
"""Store the queue.
self.transport = None
self.buf = None
self.queue = queue
def connection_made(self, transport):
"""Store the serial transport and prepare to receive data.
self.transport = transport
self.buf = bytes()
print('port opend', transport)
def data_received(self, data):
"""Store characters until a newline is received.
self.buf += data
if b'\r' in self.buf:
lines = self.buf.split(b'\r')
recv, self.buf = lines[-2:] # whatever was left over
data = recv.strip()
print(f'producing: {id(data)}')
def connection_lost(self, exc):
print('Reader closed')
async def consume(queue):
"""Get serail data with async
while True:
data = await queue.get()
print(f'consuming: {id(data)}')
await asyncio.sleep(random.random())
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
produce = partial(Reader, queue)
producer_coro = serial_asyncio.create_serial_connection(
loop, produce, com, baudrate
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
### Streams
- open_serial_connection() 是 high level async/await-ready primitives to work with network connections.
- Streams allow sending and receiving data without using callbacks or low-level protocols and transports.
- 會產生一個成對的 `asyncio.StreamReader` / `asyncio.StreamWriter`
- 需要建立一個 read, write 的 coroutine 函式來處理字串
High level async/await 用來原生連接網路
不需要回呼或是low-level protocol and transport 就可以發送和接收資料
### API
async def ser_open_connection(*args, **kwargs):
reader = asyncio.streams.StreamReader()
protocol = asyncio.streams.StreamReaderProtocol(reader)
factory = lambda: protocol
transport, _ = await ser_create_connection(factory, *args, **kwargs)
writer = asyncio.streams.StreamWriter(transport, protocol, reader)
return reader, writer
### Streams 範例
import asyncio
import serial_asyncio
import random
# serial setting
url = '/dev/cu.usbmodem1421'
url2 = '/dev/cu.usbserial-AL016RPE'
port = 9600
async def produce(queue, url, **kwargs):
"""get serial data use recv() define format with non-blocking
reader, writer = await serial_asyncio.open_serial_connection(url=url, **kwargs)
buffers = recv(reader)
async for buf in buffers:
# TODO: can handle data format here
print(f"produce id: {id(buf)}, device:{buf.split(',')[2:4]}")
await asyncio.sleep(random.random())
await queue.put(buf)
async def recv(r):
Handle stream data with different StreamReader:
'read', 'readexactly', 'readuntil', or 'readline'
while True:
msg = await r.readuntil(b'\r')
yield msg.rstrip().decode('utf-8')
async def consume(queue):
consume serial data from queue
while True:
# wait for an data from producer
data = await queue.get()
# process the data
print(f"consuming id: {id(data)}")
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
# Notify the queue that the item has been processed
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, url=url, baudrate=port)
producer_coro2 = produce(queue, url=url2, baudrate=port)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, producer_coro2, consumer_coro))
### 成功讓 serial port 非同步的讀與寫的控制 :100:
### 可能會遇到問題
- 別人開發 api 不支持 windows
- 不支持 asyncio 的套件
- 有沒有其他方式?
### Coroutine
- Generator
- Future
- Task
協同程序,從這幾個 compoments 快速帶過
### Generator
- __iter__, __next__
- 語法 yield
- 控制流程
- PEP 342: 語句變成表達式 x = yield
- 產生器演化成協同程序
### Future
- 記錄要工作狀態
- 將來執行或沒有執行的任務的結果
- 類似 Promise
### Task
- 事件循環交互,執行協同程序任務
- 一個協程對象就是一個原生可以暫停執行的函數,任務則是對協程進一步封裝,其中包含任務的各種狀態。Task 對象是 Future 的子類,它將 coroutine 和 Future 聯繫在一起,將 coroutine 封裝成一個 Future 對象。
### Async and sync 兩個世界的轉換
- Andrew Godwin - [Python & Async Simplified](https://www.aeracode.org/2018/02/19/python-async-simplified/)
- Django ASGI - [asgiref](https://github.com/django/asgiref)
- [Executing code in thread or process pool](https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools)
### 舉一個 Django ORM 範例
def get_chat_id(name):
return Chat.objects.get(name=name).id
async def main():
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, get_chat_id, "django")
### 想法與流程大概是
- synchronous callable 變成 awaitable
- 建立 concurrent.futures.ThreadPoolExecutor 讓 synchronous 跑在上面
- 建立一個 loop 並將 threadpool 的 executor 放入 loop.run_in_executor,執行完畢就會回傳一個 asyncio.Future object
- 當是一個 asyncio.Future object 想當然爾就可以 await 了
p.s 上面的 futures 和 event_loop 是不同 thread 所以不需要排進 loop 內,直接用 run_in_executor 去跑 multi-thread
## 接著用這個想法做個範例
### 建立一個會 blocking 的函式
from serial import Serial
from concurrent import futures
import asyncio
import time
async def run_blocking_tasks(executor):
s = Serial('/dev/ttys005', baudrate=9600)
msgs = [b'foo\n', b'bar\n', b'baz\n', b'qux\n', b'DONE\n']
loop = asyncio.get_event_loop()
f = [
loop.run_in_executor(executor, send, s, m)
for m in msgs
await asyncio.wait(f)
### 傳送訊息的 func
def send(ser, msg):
print(f'sending: {msg}')
if msg.rstrip().decode() == 'foo':
print(f'{msg.rstrip().decode()} sleep 3s')
### 建立一個 ThreadPoolExecutor 然後用 asyncio.run() 把 event loop 掛起來
if __name__ == '__main__':
executor = futures.ThreadPoolExecutor(max_workers=1)
### 執行結果
會發現原本 sleep 並不會讓程式阻塞住
MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
ThreadPoolExecutor-0_0 sending: b'foo\n': running
ThreadPoolExecutor-0_1 sending: b'bar\n': running
ThreadPoolExecutor-0_2 sending: b'baz\n': running
ThreadPoolExecutor-0_1 sending: b'qux\n': running
MainThread run_blocking_tasks: waiting for executor tasks
ThreadPoolExecutor-0_2 sending: b'DONE\n': running
ThreadPoolExecutor-0_0 sending: b'foo\n': foo finish sleep 3s ---> “最後才完成”
MainThread run_blocking_tasks: exiting
### Sync -> Async, wrap 起來呢?
def get(ser):
msg = ser.read_until(b'\r\n')
return msg.rstrip().decode('utf-8')
經過 @sync_to_async wrap 後變成一個 coroutine, 而且會是一個 await
> <coroutine object SyncToAsync.__call__ at 0x1091ab1c8>
>> __main__:2: RuntimeWarning: coroutine 'SyncToAsync.__call__' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
### 裝飾器的說明
class SyncToAsync:
def __init__(self):
async def __call__(self, *args, **kwargs):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(
functools.partial(self.thread_handler, loop, *args, **kwargs),
return await asyncio.wait_for(future, timeout=None)
sync_to_async = SyncToAsync
### 最後可以 await 就 4 爽
msg = await get(ser) #--> await
import asyncio
from serial import Serial
async def produce(queue, url, **kwargs):
ser = Serial(url, baudrate=9600)
while True:
msg = await get(ser) #--> await
print(f"produce id: {id(msg)}, device:{msg.split(',')[2:4]}, msg:{repr(msg)}")
await queue.put(msg)
await asyncio.sleep(random.random())
### 大家都有同樣想法:side project 時間!
- [awesome asyncio](https://github.com/timofurrer/awesome-asyncio)
- [aioserial](https://github.com/changyuheng/aioserial)
import aioserial
import asyncio
async def read_and_print(aioserial_instance: aioserial.AioSerial):
while True:
print((await aioserial_instance.read_async()).decode(errors='ignore'), end='', flush=True)
### Thank you! :sheep:
You can find me on
- [GitHub](https://github.com/chairco)
- [Twitter](https://twitter.com/home?lang=zh-tw)
- or [email](chairco@gmail.com) me
