![Imgur](https://i.imgur.com/gLo7wEY.jpg)
## async def Serial() await Serial_connection
<!-- Put the link to this slide here so people can follow -->
<style>
.reveal {
font-size: 26px;
font-family: -apple-system, BlinkMacSystemFont, Roboto, "Helvetica Neue", Helvetica, Arial, PingFangTC-Light, "Microsoft JhengHei", "微軟正黑", sans-serif, "Apple Color Emoji";
}
h1, h2 {
color: #000000 !important;
}
h3, h4, h5, h6 {
color: #40aabd !important;
}
code.blue {
color: #337AB7 !important;
}
code.orange {
color: #F7A004 !important;
}
code.green {
color: #40aabd !important;
}
</style>
PyConTW 2019
https://reurl.cc/M7ZKbm
---
<!-- .slide: data-background="https://s3.amazonaws.com/hakim-static/reveal-js/reveal-parallax-1.jpg" -->
## <code class="green">感謝大家參與這場 Session</code>
Note:
r0: threading, GIL
r2: asyncio 性能分析器
---
## Who am I?
- Jason
- 軟體工程師?
- Python :heart:
- ~~I use tabs.~~ :cat:
Note:
是軟體工程師->這個世界進步的太快了開始懷疑自己
---
### 演講將分享
+ 如何使用 asyncio 搭建一個 RS232 的序列阜非同步程式經驗
+ 除了使用 protocol 和 transport 還有其他方法?
Note:
有人接過 rs232 資料嗎?
寫過 async 程式嗎?
---
### 緣由
+ 視覺檢測機器產生數據做資料分析
+ RS232 控制與傳輸資料
+ 一台電腦同時接收兩台攝影機的數據
Note:
一台嵌入式aoi檢測機器
想要分析檢測數據
---
### 序列阜
![serialport](http://3.bp.blogspot.com/_BTnd7TC5mJw/TD0RKEOLRRI/AAAAAAAAAzs/vj9o8Wjm85c/s320/%E5%BA%8F%E5%88%97%E5%9F%A0%E8%88%87%E4%B8%A6%E5%88%97%E5%9F%A0.png)
+ 一種電腦上的通訊接口
+ 常見有 RS232、USB 和 RJ45
Note:
序列埠按電氣標準及協定來分,包括RS-232-C、RS-422、RS485、USB等。 RS-232-C、RS-422與RS-485標準只對埠的電氣特性做出規定,不涉及接外掛程式、電纜或協定。USB是近幾年發展起來的新型埠標準,主要應用於高速資料傳輸領域。
----
![Imgur](https://i.imgur.com/BWl2QJn.png)
+ UART 轉接 RS232
Note:
通用非同步收發傳輸器(Universal Asynchronous Receiver/Transmitter,通常稱為UART)是一種異步收發傳輸器,是電腦硬體的一部分,將數據通過串行通信和並行通信間作傳輸轉換。UART通常用在與其他通信接口(如EIA RS-232)的連接上。具體實物表現為獨立的模組化晶片,或是微處理器中的內部周邊裝置(peripheral)。一般和RS-232C規格的,類似Maxim的MAX232之類的標準信號幅度變換晶片進行搭配,作為連接外部設備的接口。
---
### Producer - comsumer Problem
+ Producer 不斷去讀 serial port 有資料產生就方入 Queue
+ Consumer 檢查 Queue 有資料就執行 API 存入 DB,失敗再放回 Queue 中
```graphviz
digraph {
compound=true
rankdir=RL
graph [ fontname="Source Sans Pro", fontsize=20 ];
node [ fontname="Source Sans Pro", fontsize=18];
edge [ fontname="Source Sans Pro", fontsize=12 ];
subgraph core {
c [label="core \nFIFO"] [shape=box]
}
c -> Queue [ltail=session lhead=session]
subgraph cluster1 {
concentrate=true
a [label="Producer\n"] [shape=box]
b [label="Consumer"] [shape=box]
Queue [label="Queue" shape=plaintext ]
b -> Queue [label="push, pull", dir="both"]
a -> Queue [label="push", fontcolor=darkgreen]
label="asynchronous"
}
}
```
Note:
概念上不知道資料何時才進,所以想設計一個comsumer-producer mode
---
### 選擇套件
![#](https://pyserial.readthedocs.io/en/latest/_static/pyserial.png)
```python=
from serial import Serial
with Serial('/dev/ttyS1', 19200, timeout=1) as ser:
x = ser.read() # read one byte
s = ser.read(10) # read up to ten bytes (timeout)
line = ser.readline() # read a '\n' terminated line
```
ex. Win 的 port 表達方式 `COM2, COM3`
Note:
pyserial 是一個很不錯的套件
但是一個同步的方式讀取
---
### Synchronous 的問題
+ While 迴圈等待資料
+ [多執行緒](https://github.com/chairco/asyncio-pySerial/blob/master/thread.py) -GIL
+ 更潮的做法(2019 算潮嗎?)
Note:
同步方式其實很簡單也很好,但會有一些問題像是i/o問題可能會等待
例如 time.sleep,或許多執行緒可解決但執行緒是在cpu層的,頻繁 content switch 其實會耗費時間
---
### Asynchronous 夠潮嗎?
- 非阻塞事件驅動
- 非阻塞式 Coroutine
- asyncio
Note:
非同步有很多策略,今天講的是其中兩種,然後會使用 asyncio
~~阻塞式多行程~~
~~阻塞式多行程多執行緒~~
非阻塞式事件驅動
非阻塞式 Coroutine
Non-blocking I/O
Callback -> coroutine
Event loop
---
看了 asyncio 文件,依樣畫葫蘆
```python=
def consumer():
with serial.Serial(com, port, timeout=0) as ser:
buffer = ''
while True:
buffer += ser.readline().decode()
if '\r' in buffer:
buf = buffer.split('\r')
last_received, buffer = buffer.split('\r')[-2:]
yield last_received
```
----
```python=
async def async_producer(session, q):
start = time.time()
while len(q):
data = q.popleft()
status, result = await async_post(session, url_seq, url_films, data)
if status != 201:
q.append(data)
if time.time() - start >= 1:
break
```
----
```python=
q = deque()
com = 'COM5'
port = 9600
last_received = ''
async def async_main():
global q
async with aiohttp.ClientSession() as session:
for data in consumer():
q.append(data)
d = q.popleft()
status, result = await async_post(session, url_seq, url_films, data=d) # post data
if status != 201:
q.append(d)
if len(q) >= 2:
await async_producer(session, q)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_main())
loop.close()
```
----
### 錯誤在哪裡?
- 會 Blocking
- 不是非同步,是同步
- Event loop, coroutine, Future, Task
- 錯誤[範例](https://github.com/chairco/kaohsiung.py_talk/blob/master/20180918_Data_Engineering_at_Traditional_Factory/async_sample/wrong_async.py)
- 正確[範例](https://github.com/chairco/kaohsiung.py_talk/blob/master/20180918_Data_Engineering_at_Traditional_Factory/async_sample/rignt_async.py)
----
### 說明
```python=
async def async_main():
global q # --> 錯誤 (1)
async with aiohttp.ClientSession() as session:
for data in consumer(): # --> 錯誤 (2)
q.append(data)
d = q.popleft()
status, result = await async_post(session, url_seq, url_films, data=d) # post data
if status != 201:
q.append(d)
if len(q) >= 2:
await async_producer(session, q)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_main())
loop.close()
```
Note:
眼睛業障重快跳掉
---
### 不如來看看別人怎麼寫的?
- 創造源自於模仿 :bulb:
---
### PySerial-asyncio :fork_and_knife:
![#](https://pyserial-asyncio.readthedocs.io/en/latest/_static/pyserial-asyncio.png)
- pyserial 底下的專案
- 支援 3.4 之後
----
![#](https://i.imgur.com/1kdcKqE.png)
----
### 快快樂樂非同步,真的很簡單的 :+1:
```python=
import asyncio
import serial_asyncio
class Output(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
print('port opened', transport)
transport.serial.rts = False
transport.write(b'hello world\n')
def data_received(self, data):
print('data received', repr(data))
self.transport.close()
def connection_lost(self, exc):
print('port closed')
asyncio.get_event_loop().stop()
loop = asyncio.get_event_loop()
coro = serial_asyncio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
```
---
### 有些地方似乎不太明白?
- Output 繼承 asyncio.porotocol?
- create_serial_connection 回傳一個 coroutine?
---
### <code class="orange">Protocols</code>, <code class="blue">Transports</code>
- [PEP 3156](https://www.python.org/dev/peps/pep-3156/#transports-and-protocols), [PEP 3153](https://www.python.org/dev/peps/pep-3153/)
- low-level api
- callback-based
- should only be used in libraries and frameworks and never in high-level asyncio applications
Note:
源自於 PEP 3153, 3156
callback-based api
建議用於函式庫或是框架不要用在 high level 的 application
然後我們來看一下官方文件對於這兩個 api 介紹
----
### <code class="orange">Protocols</code>, <code class="blue">Transports</code>
- At the highest level, the transport is concerned with how bytes are transmitted, while the protocol determines which bytes to transmit (and to some extent when).
- A different way of saying the same thing: a transport is an abstraction for a socket (or similar I/O endpoint) while a protocol is an abstraction for an application, from the transport’s point of view.
Note:
最高層傳輸只關心 怎樣 傳送位元組內容,而協議決定傳送 哪些 位元組內容(還要在一定程度上考慮何時)
從傳輸的角度來看,傳輸是抽像化socket(或類似的I/O endpoint),而協議是抽象化應用程序的。
transport 定義傳輸方式而 protocol 定義要傳輸哪些字元
transport 抽象 i/o endpoint, protocol 抽像應用
----
### <code class="orange">Protocols</code>, <code class="blue">Transports</code>
- Yet another view is the transport and protocol interfaces together define an abstract interface for using network I/O and interprocess I/O.
- There is always a 1:1 relationship between transport and protocol objects: the protocol calls transport methods to send data, while the transport calls protocol methods to pass it data that has been received.
Note:
t和p一起定義網路I/0和行程通訊之間I/O的抽象接口。
t對象和p對象總是一對一關係:protocol調用transport方法來發送資料,而transport在接收到資料時時調用protocl方法傳遞資料。
兩者共同定義一個使用網路i/o和行程通訊i/o的抽象接口
彼此存在1:1關係,protocol 呼叫 transport 方法傳送資料,transport 則呼叫 protocol 來傳遞已經接收的數據
----
### <code class="orange">Protocols</code>, <code class="blue">Transports</code>
- Most of connection oriented event loop methods (such as loop.create_connection()) usually accept a protocol_factory argument used to create a Protocol object for an accepted connection, represented by a Transport object. Such methods usually return a tuple of (transport, protocol).
Note:
大部分面向連接的事件循環方法(如 loop.create_connection()) 通常接受 protocol_factory 參數為接收到的鏈接創建 p 對象,並用 t 對象來表示。這些方法一般會返回 (transport, protocol) 元組。
----
### <code class="orange">Protocols</code>
- represent applications such as HTTP client/server, SMTP and FTP
- Async http operation
- loop.create_connection()
<br></br>
### <code class="blue">Transports</code>
- represent connections sush a sockets, SSL connection and pipes
- Async socket operations
- ususally frameworks implements e.g. Tornado
Note:
protocol: application 應用程序, 非同步 http, loop.create_connection()
Tranports: socket connection
---
### Layers
![Imgur](https://i.imgur.com/dm4jtqw.png)
[What Is Async, How Does It Work, And When Should I Use It? -A. Jesse Jiryu Davis](https://www.slideshare.net/emptysquare/what-is-async-how-does-it-work-and-when-should-i-use-it)
Note:
用 Jesse 一張很淺顯的圖片,Protocol 就是 websocket, socket 就是 transport
---
### 進入核心! :100: :muscle: :tada:
Note:
有了這個概念之後我們來看看 pyserial_asyncio api
---
```python=
class SerialTransport(asyncio.Transport):
...
```
<br>
```python=
@asyncio.coroutine
def create_serial_connection(loop, protocol_factory, *args, **kwargs):
...
```
<br>
```python=
@asyncio.coroutine
def open_serial_connection(*,loop=None,
limit=asyncio.streams._DEFAULT_LIMIT,
**kwargs):
...
```
Note:
共有一個 class 兩個 function
---
### class SerialTransport
- Low level api
- asyncio.Transport 子類別
- 實現非同步第一種方法
- 建立一個 `asyncio.Protocol` 子類別然後使用 `SerialTransport` 作為 `transport`
Note:
接下來來介紹第一種方法
一個asyncio.transport的子類別
然後我們要建立一個asyncio.Protocol子類別然後使用SerialTransport作為transport
---
### <code class="orange">Protocol</code>
- 透過 callbacks 方法告訴 protocol 如何操作
- transport 來處理 callbacks 來回應事件
- connections opened
- data arrives
- 預設的 callbacks 是空值,所以只要複寫一些感興趣的 methods
- connection_made
- connection_lost
- data_received
Note:
connection_made:開始進行各種不同通訊協定操作非同步i/o方法,單獨的傳輸類用於處理socket和用於處理子進程的管道
data_received:接收數據
connection_lost:關閉連接時會被呼叫
----
### API
```python=
class SerialTransport(asyncio.Transport):
def __init__(self, loop, protocol, *args, **kwargs):
self._loop = loop
self._protocol = protocol
def get_protocol(self):
return self._protocol
def set_protocol(self, protocol):
return self._protocol
# Implement read/write methods
# [...]
```
----
### API
```python=
async def ser_create_connection(protocol_factory, *args, **kwargs):
loop = asyncio.get_event_loop()
protocol = protocol_factory()
transport = SerialTransport(loop, protocol, *args, **kwargs)
return transport, protocol
```
----
### Protocol 範例(Reader)
```python=
import asyncio
import serial_asyncio
import random
from functools import partial
com = '/dev/cu.usbmodem1411' #'/dev/ttys006'
baudrate = 9600
class Reader(asyncio.Protocol):
"""
"""
def __init__(self, queue):
"""Store the queue.
"""
super().__init__()
self.transport = None
self.buf = None
self.queue = queue
def connection_made(self, transport):
"""Store the serial transport and prepare to receive data.
"""
self.transport = transport
self.buf = bytes()
print('port opend', transport)
def data_received(self, data):
"""Store characters until a newline is received.
"""
self.buf += data
if b'\r' in self.buf:
lines = self.buf.split(b'\r')
recv, self.buf = lines[-2:] # whatever was left over
data = recv.strip()
asyncio.ensure_future(self.queue.put(data))
self.buf.strip()
print(f'producing: {id(data)}')
def connection_lost(self, exc):
print('Reader closed')
async def consume(queue):
"""Get serail data with async
"""
while True:
data = await queue.get()
print(f'consuming: {id(data)}')
await asyncio.sleep(random.random())
queue.task_done()
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
produce = partial(Reader, queue)
producer_coro = serial_asyncio.create_serial_connection(
loop, produce, com, baudrate
)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()
```
---
### Streams
- open_serial_connection() 是 high level async/await-ready primitives to work with network connections.
- Streams allow sending and receiving data without using callbacks or low-level protocols and transports.
- 會產生一個成對的 `asyncio.StreamReader` / `asyncio.StreamWriter`
- 需要建立一個 read, write 的 coroutine 函式來處理字串
Note:
High level async/await 用來原生連接網路
不需要回呼或是low-level protocol and transport 就可以發送和接收資料
----
### API
```python=
async def ser_open_connection(*args, **kwargs):
reader = asyncio.streams.StreamReader()
protocol = asyncio.streams.StreamReaderProtocol(reader)
factory = lambda: protocol
transport, _ = await ser_create_connection(factory, *args, **kwargs)
writer = asyncio.streams.StreamWriter(transport, protocol, reader)
return reader, writer
```
----
### Streams 範例
```python=
import asyncio
import serial_asyncio
import random
# serial setting
url = '/dev/cu.usbmodem1421'
url2 = '/dev/cu.usbserial-AL016RPE'
port = 9600
async def produce(queue, url, **kwargs):
"""get serial data use recv() define format with non-blocking
"""
reader, writer = await serial_asyncio.open_serial_connection(url=url, **kwargs)
buffers = recv(reader)
async for buf in buffers:
# TODO: can handle data format here
print(f"produce id: {id(buf)}, device:{buf.split(',')[2:4]}")
await asyncio.sleep(random.random())
await queue.put(buf)
async def recv(r):
"""
Handle stream data with different StreamReader:
'read', 'readexactly', 'readuntil', or 'readline'
"""
while True:
msg = await r.readuntil(b'\r')
yield msg.rstrip().decode('utf-8')
async def consume(queue):
"""
consume serial data from queue
"""
while True:
# wait for an data from producer
data = await queue.get()
# process the data
print(f"consuming id: {id(data)}")
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
# Notify the queue that the item has been processed
queue.task_done()
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, url=url, baudrate=port)
producer_coro2 = produce(queue, url=url2, baudrate=port)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, producer_coro2, consumer_coro))
loop.close()
```
---
### 成功讓 serial port 非同步的讀與寫的控制 :100:
---
### 可能會遇到問題
- 別人開發 api 不支持 windows
- 不支持 asyncio 的套件
- 有沒有其他方式?
---
### Coroutine
- Generator
- Future
- Task
Note:
協同程序,從這幾個 compoments 快速帶過
----
### Generator
- __iter__, __next__
- 語法 yield
- 控制流程
- PEP 342: 語句變成表達式 x = yield
- 產生器演化成協同程序
----
### Future
- 記錄要工作狀態
- 將來執行或沒有執行的任務的結果
- 類似 Promise
----
### Task
- 事件循環交互,執行協同程序任務
- 一個協程對象就是一個原生可以暫停執行的函數,任務則是對協程進一步封裝,其中包含任務的各種狀態。Task 對象是 Future 的子類,它將 coroutine 和 Future 聯繫在一起,將 coroutine 封裝成一個 Future 對象。
---
### Async and sync 兩個世界的轉換
- Andrew Godwin - [Python & Async Simplified](https://www.aeracode.org/2018/02/19/python-async-simplified/)
- Django ASGI - [asgiref](https://github.com/django/asgiref)
- [Executing code in thread or process pool](https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools)
![Imgur](https://i.imgur.com/EZ2PMMb.png)
---
### 舉一個 Django ORM 範例
```python=
def get_chat_id(name):
return Chat.objects.get(name=name).id
async def main():
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, get_chat_id, "django")
```
Note:
從非同步函數中呼叫同步函數很危險,因為你不確定這是不是一個會阻塞的函數
---
### 想法與流程大概是
- synchronous callable 變成 awaitable
- 建立 concurrent.futures.ThreadPoolExecutor 讓 synchronous 跑在上面
- 建立一個 loop 並將 threadpool 的 executor 放入 loop.run_in_executor,執行完畢就會回傳一個 asyncio.Future object
- 當是一個 asyncio.Future object 想當然爾就可以 await 了
p.s 上面的 futures 和 event_loop 是不同 thread 所以不需要排進 loop 內,直接用 run_in_executor 去跑 multi-thread
---
## 接著用這個想法做個範例
----
### 建立一個會 blocking 的函式
```python=
from serial import Serial
from concurrent import futures
import asyncio
import time
async def run_blocking_tasks(executor):
s = Serial('/dev/ttys005', baudrate=9600)
msgs = [b'foo\n', b'bar\n', b'baz\n', b'qux\n', b'DONE\n']
loop = asyncio.get_event_loop()
f = [
loop.run_in_executor(executor, send, s, m)
for m in msgs
]
await asyncio.wait(f)
```
----
### 傳送訊息的 func
```python=
def send(ser, msg):
ser.write(msg)
print(f'sending: {msg}')
if msg.rstrip().decode() == 'foo':
time.sleep(3)
print(f'{msg.rstrip().decode()} sleep 3s')
```
----
### 建立一個 ThreadPoolExecutor 然後用 asyncio.run() 把 event loop 掛起來
```python=
if __name__ == '__main__':
executor = futures.ThreadPoolExecutor(max_workers=1)
asyncio.run(run_blocking_tasks(executor))
```
----
### 執行結果
會發現原本 sleep 並不會讓程式阻塞住
```
MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
ThreadPoolExecutor-0_0 sending: b'foo\n': running
ThreadPoolExecutor-0_1 sending: b'bar\n': running
ThreadPoolExecutor-0_2 sending: b'baz\n': running
ThreadPoolExecutor-0_1 sending: b'qux\n': running
MainThread run_blocking_tasks: waiting for executor tasks
ThreadPoolExecutor-0_2 sending: b'DONE\n': running
ThreadPoolExecutor-0_0 sending: b'foo\n': foo finish sleep 3s ---> “最後才完成”
MainThread run_blocking_tasks: exiting
```
---
### Sync -> Async, wrap 起來呢?
```python=
@sync_to_async
def get(ser):
msg = ser.read_until(b'\r\n')
return msg.rstrip().decode('utf-8')
```
經過 @sync_to_async wrap 後變成一個 coroutine, 而且會是一個 await
> <coroutine object SyncToAsync.__call__ at 0x1091ab1c8>
>> __main__:2: RuntimeWarning: coroutine 'SyncToAsync.__call__' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
----
### 裝飾器的說明
```python=
class SyncToAsync:
def __init__(self):
....
async def __call__(self, *args, **kwargs):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(
None,
functools.partial(self.thread_handler, loop, *args, **kwargs),
)
return await asyncio.wait_for(future, timeout=None)
sync_to_async = SyncToAsync
```
----
### 最後可以 await 就 4 爽
```python=
msg = await get(ser) #--> await
```
```python=
import asyncio
from serial import Serial
async def produce(queue, url, **kwargs):
ser = Serial(url, baudrate=9600)
while True:
msg = await get(ser) #--> await
print(f"produce id: {id(msg)}, device:{msg.split(',')[2:4]}, msg:{repr(msg)}")
await queue.put(msg)
await asyncio.sleep(random.random())
```
---
### 大家都有同樣想法:side project 時間!
- [awesome asyncio](https://github.com/timofurrer/awesome-asyncio)
- [aioserial](https://github.com/changyuheng/aioserial)
----
```python=
import aioserial
import asyncio
async def read_and_print(aioserial_instance: aioserial.AioSerial):
while True:
print((await aioserial_instance.read_async()).decode(errors='ignore'), end='', flush=True)
asyncio.run(read_and_print(aioserial.AioSerial(port='COM1')))
```
---
### Thank you! :sheep:
You can find me on
- [GitHub](https://github.com/chairco)
- [Twitter](https://twitter.com/home?lang=zh-tw)
- or [email](chairco@gmail.com) me
{"metaMigratedAt":"2023-06-15T00:02:06.774Z","metaMigratedFrom":"YAML","title":"async def Serial() await Serial_connection","breaks":true,"description":"View the slide with \"Slide Mode\".","slideOptions":"{\"theme\":\"solarized\",\"transition\":\"fade\",\"allottedMinutes\":28,\"spotlight\":{\"enabled\":false}}","contributors":"[{\"id\":\"52fcffbd-98fb-4986-a324-085d370c6514\",\"add\":28767,\"del\":7672}]"}