![Imgur](https://i.imgur.com/gLo7wEY.jpg) ## async def Serial() await Serial_connection <!-- Put the link to this slide here so people can follow --> <style> .reveal { font-size: 26px; font-family: -apple-system, BlinkMacSystemFont, Roboto, "Helvetica Neue", Helvetica, Arial, PingFangTC-Light, "Microsoft JhengHei", "微軟正黑", sans-serif, "Apple Color Emoji"; } h1, h2 { color: #000000 !important; } h3, h4, h5, h6 { color: #40aabd !important; } code.blue { color: #337AB7 !important; } code.orange { color: #F7A004 !important; } code.green { color: #40aabd !important; } </style> PyConTW 2019 https://reurl.cc/M7ZKbm --- <!-- .slide: data-background="https://s3.amazonaws.com/hakim-static/reveal-js/reveal-parallax-1.jpg" --> ## <code class="green">感謝大家參與這場 Session</code> Note: r0: threading, GIL r2: asyncio 性能分析器 --- ## Who am I? - Jason - 軟體工程師? - Python :heart: - ~~I use tabs.~~ :cat: Note: 是軟體工程師->這個世界進步的太快了開始懷疑自己 --- ### 演講將分享 + 如何使用 asyncio 搭建一個 RS232 的序列阜非同步程式經驗 + 除了使用 protocol 和 transport 還有其他方法? Note: 有人接過 rs232 資料嗎? 寫過 async 程式嗎? --- ### 緣由 + 視覺檢測機器產生數據做資料分析 + RS232 控制與傳輸資料 + 一台電腦同時接收兩台攝影機的數據 Note: 一台嵌入式aoi檢測機器 想要分析檢測數據 --- ### 序列阜 ![serialport](http://3.bp.blogspot.com/_BTnd7TC5mJw/TD0RKEOLRRI/AAAAAAAAAzs/vj9o8Wjm85c/s320/%E5%BA%8F%E5%88%97%E5%9F%A0%E8%88%87%E4%B8%A6%E5%88%97%E5%9F%A0.png) + 一種電腦上的通訊接口 + 常見有 RS232、USB 和 RJ45 Note: 序列埠按電氣標準及協定來分,包括RS-232-C、RS-422、RS485、USB等。 RS-232-C、RS-422與RS-485標準只對埠的電氣特性做出規定,不涉及接外掛程式、電纜或協定。USB是近幾年發展起來的新型埠標準,主要應用於高速資料傳輸領域。 ---- ![Imgur](https://i.imgur.com/BWl2QJn.png) + UART 轉接 RS232 Note: 通用非同步收發傳輸器(Universal Asynchronous Receiver/Transmitter,通常稱為UART)是一種異步收發傳輸器,是電腦硬體的一部分,將數據通過串行通信和並行通信間作傳輸轉換。UART通常用在與其他通信接口(如EIA RS-232)的連接上。具體實物表現為獨立的模組化晶片,或是微處理器中的內部周邊裝置(peripheral)。一般和RS-232C規格的,類似Maxim的MAX232之類的標準信號幅度變換晶片進行搭配,作為連接外部設備的接口。 --- ### Producer - comsumer Problem + Producer 不斷去讀 serial port 有資料產生就方入 Queue + Consumer 檢查 Queue 有資料就執行 API 存入 DB,失敗再放回 Queue 中 ```graphviz digraph { compound=true rankdir=RL graph [ fontname="Source Sans Pro", fontsize=20 ]; node [ fontname="Source Sans Pro", fontsize=18]; edge [ fontname="Source Sans Pro", fontsize=12 ]; subgraph core { c [label="core \nFIFO"] [shape=box] } c -> Queue [ltail=session lhead=session] subgraph cluster1 { concentrate=true a [label="Producer\n"] [shape=box] b [label="Consumer"] [shape=box] Queue [label="Queue" shape=plaintext ] b -> Queue [label="push, pull", dir="both"] a -> Queue [label="push", fontcolor=darkgreen] label="asynchronous" } } ``` Note: 概念上不知道資料何時才進,所以想設計一個comsumer-producer mode --- ### 選擇套件 ![#](https://pyserial.readthedocs.io/en/latest/_static/pyserial.png) ```python= from serial import Serial with Serial('/dev/ttyS1', 19200, timeout=1) as ser: x = ser.read() # read one byte s = ser.read(10) # read up to ten bytes (timeout) line = ser.readline() # read a '\n' terminated line ``` ex. Win 的 port 表達方式 `COM2, COM3` Note: pyserial 是一個很不錯的套件 但是一個同步的方式讀取 --- ### Synchronous 的問題 + While 迴圈等待資料 + [多執行緒](https://github.com/chairco/asyncio-pySerial/blob/master/thread.py) -GIL + 更潮的做法(2019 算潮嗎?) Note: 同步方式其實很簡單也很好,但會有一些問題像是i/o問題可能會等待 例如 time.sleep,或許多執行緒可解決但執行緒是在cpu層的,頻繁 content switch 其實會耗費時間 --- ### Asynchronous 夠潮嗎? - 非阻塞事件驅動 - 非阻塞式 Coroutine - asyncio Note: 非同步有很多策略,今天講的是其中兩種,然後會使用 asyncio ~~阻塞式多行程~~ ~~阻塞式多行程多執行緒~~ 非阻塞式事件驅動 非阻塞式 Coroutine Non-blocking I/O Callback -> coroutine Event loop --- 看了 asyncio 文件,依樣畫葫蘆 ```python= def consumer(): with serial.Serial(com, port, timeout=0) as ser: buffer = '' while True: buffer += ser.readline().decode() if '\r' in buffer: buf = buffer.split('\r') last_received, buffer = buffer.split('\r')[-2:] yield last_received ``` ---- ```python= async def async_producer(session, q): start = time.time() while len(q): data = q.popleft() status, result = await async_post(session, url_seq, url_films, data) if status != 201: q.append(data) if time.time() - start >= 1: break ``` ---- ```python= q = deque() com = 'COM5' port = 9600 last_received = '' async def async_main(): global q async with aiohttp.ClientSession() as session: for data in consumer(): q.append(data) d = q.popleft() status, result = await async_post(session, url_seq, url_films, data=d) # post data if status != 201: q.append(d) if len(q) >= 2: await async_producer(session, q) loop = asyncio.get_event_loop() loop.run_until_complete(async_main()) loop.close() ``` ---- ### 錯誤在哪裡? - 會 Blocking - 不是非同步,是同步 - Event loop, coroutine, Future, Task - 錯誤[範例](https://github.com/chairco/kaohsiung.py_talk/blob/master/20180918_Data_Engineering_at_Traditional_Factory/async_sample/wrong_async.py) - 正確[範例](https://github.com/chairco/kaohsiung.py_talk/blob/master/20180918_Data_Engineering_at_Traditional_Factory/async_sample/rignt_async.py) ---- ### 說明 ```python= async def async_main(): global q # --> 錯誤 (1) async with aiohttp.ClientSession() as session: for data in consumer(): # --> 錯誤 (2) q.append(data) d = q.popleft() status, result = await async_post(session, url_seq, url_films, data=d) # post data if status != 201: q.append(d) if len(q) >= 2: await async_producer(session, q) loop = asyncio.get_event_loop() loop.run_until_complete(async_main()) loop.close() ``` Note: 眼睛業障重快跳掉 --- ### 不如來看看別人怎麼寫的? - 創造源自於模仿 :bulb: --- ### PySerial-asyncio :fork_and_knife: ![#](https://pyserial-asyncio.readthedocs.io/en/latest/_static/pyserial-asyncio.png) - pyserial 底下的專案 - 支援 3.4 之後 ---- ![#](https://i.imgur.com/1kdcKqE.png) ---- ### 快快樂樂非同步,真的很簡單的 :+1: ```python= import asyncio import serial_asyncio class Output(asyncio.Protocol): def connection_made(self, transport): self.transport = transport print('port opened', transport) transport.serial.rts = False transport.write(b'hello world\n') def data_received(self, data): print('data received', repr(data)) self.transport.close() def connection_lost(self, exc): print('port closed') asyncio.get_event_loop().stop() loop = asyncio.get_event_loop() coro = serial_asyncio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200) loop.run_until_complete(coro) loop.run_forever() loop.close() ``` --- ### 有些地方似乎不太明白? - Output 繼承 asyncio.porotocol? - create_serial_connection 回傳一個 coroutine? --- ### <code class="orange">Protocols</code>, <code class="blue">Transports</code> - [PEP 3156](https://www.python.org/dev/peps/pep-3156/#transports-and-protocols), [PEP 3153](https://www.python.org/dev/peps/pep-3153/) - low-level api - callback-based - should only be used in libraries and frameworks and never in high-level asyncio applications Note: 源自於 PEP 3153, 3156 callback-based api 建議用於函式庫或是框架不要用在 high level 的 application 然後我們來看一下官方文件對於這兩個 api 介紹 ---- ### <code class="orange">Protocols</code>, <code class="blue">Transports</code> - At the highest level, the transport is concerned with how bytes are transmitted, while the protocol determines which bytes to transmit (and to some extent when). - A different way of saying the same thing: a transport is an abstraction for a socket (or similar I/O endpoint) while a protocol is an abstraction for an application, from the transport’s point of view. Note: 最高層傳輸只關心 怎樣 傳送位元組內容,而協議決定傳送 哪些 位元組內容(還要在一定程度上考慮何時) 從傳輸的角度來看,傳輸是抽像化socket(或類似的I/O endpoint),而協議是抽象化應用程序的。 transport 定義傳輸方式而 protocol 定義要傳輸哪些字元 transport 抽象 i/o endpoint, protocol 抽像應用 ---- ### <code class="orange">Protocols</code>, <code class="blue">Transports</code> - Yet another view is the transport and protocol interfaces together define an abstract interface for using network I/O and interprocess I/O. - There is always a 1:1 relationship between transport and protocol objects: the protocol calls transport methods to send data, while the transport calls protocol methods to pass it data that has been received. Note: t和p一起定義網路I/0和行程通訊之間I/O的抽象接口。 t對象和p對象總是一對一關係:protocol調用transport方法來發送資料,而transport在接收到資料時時調用protocl方法傳遞資料。 兩者共同定義一個使用網路i/o和行程通訊i/o的抽象接口 彼此存在1:1關係,protocol 呼叫 transport 方法傳送資料,transport 則呼叫 protocol 來傳遞已經接收的數據 ---- ### <code class="orange">Protocols</code>, <code class="blue">Transports</code> - Most of connection oriented event loop methods (such as loop.create_connection()) usually accept a protocol_factory argument used to create a Protocol object for an accepted connection, represented by a Transport object. Such methods usually return a tuple of (transport, protocol). Note: 大部分面向連接的事件循環方法(如 loop.create_connection()) 通常接受 protocol_factory 參數為接收到的鏈接創建 p 對象,並用 t 對象來表示。這些方法一般會返回 (transport, protocol) 元組。 ---- ### <code class="orange">Protocols</code> - represent applications such as HTTP client/server, SMTP and FTP - Async http operation - loop.create_connection() <br></br> ### <code class="blue">Transports</code> - represent connections sush a sockets, SSL connection and pipes - Async socket operations - ususally frameworks implements e.g. Tornado Note: protocol: application 應用程序, 非同步 http, loop.create_connection() Tranports: socket connection --- ### Layers ![Imgur](https://i.imgur.com/dm4jtqw.png) [What Is Async, How Does It Work, And When Should I Use It? -A. Jesse Jiryu Davis](https://www.slideshare.net/emptysquare/what-is-async-how-does-it-work-and-when-should-i-use-it) Note: 用 Jesse 一張很淺顯的圖片,Protocol 就是 websocket, socket 就是 transport --- ### 進入核心! :100: :muscle: :tada: Note: 有了這個概念之後我們來看看 pyserial_asyncio api --- ```python= class SerialTransport(asyncio.Transport): ... ``` <br> ```python= @asyncio.coroutine def create_serial_connection(loop, protocol_factory, *args, **kwargs): ... ``` <br> ```python= @asyncio.coroutine def open_serial_connection(*,loop=None, limit=asyncio.streams._DEFAULT_LIMIT, **kwargs): ... ``` Note: 共有一個 class 兩個 function --- ### class SerialTransport - Low level api - asyncio.Transport 子類別 - 實現非同步第一種方法 - 建立一個 `asyncio.Protocol` 子類別然後使用 `SerialTransport` 作為 `transport` Note: 接下來來介紹第一種方法 一個asyncio.transport的子類別 然後我們要建立一個asyncio.Protocol子類別然後使用SerialTransport作為transport --- ### <code class="orange">Protocol</code> - 透過 callbacks 方法告訴 protocol 如何操作 - transport 來處理 callbacks 來回應事件 - connections opened - data arrives - 預設的 callbacks 是空值,所以只要複寫一些感興趣的 methods - connection_made - connection_lost - data_received Note: connection_made:開始進行各種不同通訊協定操作非同步i/o方法,單獨的傳輸類用於處理socket和用於處理子進程的管道 data_received:接收數據 connection_lost:關閉連接時會被呼叫 ---- ### API ```python= class SerialTransport(asyncio.Transport): def __init__(self, loop, protocol, *args, **kwargs): self._loop = loop self._protocol = protocol def get_protocol(self): return self._protocol def set_protocol(self, protocol): return self._protocol # Implement read/write methods # [...] ``` ---- ### API ```python= async def ser_create_connection(protocol_factory, *args, **kwargs): loop = asyncio.get_event_loop() protocol = protocol_factory() transport = SerialTransport(loop, protocol, *args, **kwargs) return transport, protocol ``` ---- ### Protocol 範例(Reader) ```python= import asyncio import serial_asyncio import random from functools import partial com = '/dev/cu.usbmodem1411' #'/dev/ttys006' baudrate = 9600 class Reader(asyncio.Protocol): """ """ def __init__(self, queue): """Store the queue. """ super().__init__() self.transport = None self.buf = None self.queue = queue def connection_made(self, transport): """Store the serial transport and prepare to receive data. """ self.transport = transport self.buf = bytes() print('port opend', transport) def data_received(self, data): """Store characters until a newline is received. """ self.buf += data if b'\r' in self.buf: lines = self.buf.split(b'\r') recv, self.buf = lines[-2:] # whatever was left over data = recv.strip() asyncio.ensure_future(self.queue.put(data)) self.buf.strip() print(f'producing: {id(data)}') def connection_lost(self, exc): print('Reader closed') async def consume(queue): """Get serail data with async """ while True: data = await queue.get() print(f'consuming: {id(data)}') await asyncio.sleep(random.random()) queue.task_done() loop = asyncio.get_event_loop() queue = asyncio.Queue(loop=loop) produce = partial(Reader, queue) producer_coro = serial_asyncio.create_serial_connection( loop, produce, com, baudrate ) consumer_coro = consume(queue) loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro)) loop.close() ``` --- ### Streams - open_serial_connection() 是 high level async/await-ready primitives to work with network connections. - Streams allow sending and receiving data without using callbacks or low-level protocols and transports. - 會產生一個成對的 `asyncio.StreamReader` / `asyncio.StreamWriter` - 需要建立一個 read, write 的 coroutine 函式來處理字串 Note: High level async/await 用來原生連接網路 不需要回呼或是low-level protocol and transport 就可以發送和接收資料 ---- ### API ```python= async def ser_open_connection(*args, **kwargs): reader = asyncio.streams.StreamReader() protocol = asyncio.streams.StreamReaderProtocol(reader) factory = lambda: protocol transport, _ = await ser_create_connection(factory, *args, **kwargs) writer = asyncio.streams.StreamWriter(transport, protocol, reader) return reader, writer ``` ---- ### Streams 範例 ```python= import asyncio import serial_asyncio import random # serial setting url = '/dev/cu.usbmodem1421' url2 = '/dev/cu.usbserial-AL016RPE' port = 9600 async def produce(queue, url, **kwargs): """get serial data use recv() define format with non-blocking """ reader, writer = await serial_asyncio.open_serial_connection(url=url, **kwargs) buffers = recv(reader) async for buf in buffers: # TODO: can handle data format here print(f"produce id: {id(buf)}, device:{buf.split(',')[2:4]}") await asyncio.sleep(random.random()) await queue.put(buf) async def recv(r): """ Handle stream data with different StreamReader: 'read', 'readexactly', 'readuntil', or 'readline' """ while True: msg = await r.readuntil(b'\r') yield msg.rstrip().decode('utf-8') async def consume(queue): """ consume serial data from queue """ while True: # wait for an data from producer data = await queue.get() # process the data print(f"consuming id: {id(data)}") # simulate i/o operation using sleep await asyncio.sleep(random.random()) # Notify the queue that the item has been processed queue.task_done() loop = asyncio.get_event_loop() queue = asyncio.Queue(loop=loop) producer_coro = produce(queue, url=url, baudrate=port) producer_coro2 = produce(queue, url=url2, baudrate=port) consumer_coro = consume(queue) loop.run_until_complete(asyncio.gather(producer_coro, producer_coro2, consumer_coro)) loop.close() ``` --- ### 成功讓 serial port 非同步的讀與寫的控制 :100: --- ### 可能會遇到問題 - 別人開發 api 不支持 windows - 不支持 asyncio 的套件 - 有沒有其他方式? --- ### Coroutine - Generator - Future - Task Note: 協同程序,從這幾個 compoments 快速帶過 ---- ### Generator - __iter__, __next__ - 語法 yield - 控制流程 - PEP 342: 語句變成表達式 x = yield - 產生器演化成協同程序 ---- ### Future - 記錄要工作狀態 - 將來執行或沒有執行的任務的結果 - 類似 Promise ---- ### Task - 事件循環交互,執行協同程序任務 - 一個協程對象就是一個原生可以暫停執行的函數,任務則是對協程進一步封裝,其中包含任務的各種狀態。Task 對象是 Future 的子類,它將 coroutine 和 Future 聯繫在一起,將 coroutine 封裝成一個 Future 對象。 --- ### Async and sync 兩個世界的轉換 - Andrew Godwin - [Python & Async Simplified](https://www.aeracode.org/2018/02/19/python-async-simplified/) - Django ASGI - [asgiref](https://github.com/django/asgiref) - [Executing code in thread or process pool](https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools) ![Imgur](https://i.imgur.com/EZ2PMMb.png) --- ### 舉一個 Django ORM 範例 ```python= def get_chat_id(name): return Chat.objects.get(name=name).id async def main(): executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) loop = asyncio.get_event_loop() result = await loop.run_in_executor(executor, get_chat_id, "django") ``` Note: 從非同步函數中呼叫同步函數很危險,因為你不確定這是不是一個會阻塞的函數 --- ### 想法與流程大概是 - synchronous callable 變成 awaitable - 建立 concurrent.futures.ThreadPoolExecutor 讓 synchronous 跑在上面 - 建立一個 loop 並將 threadpool 的 executor 放入 loop.run_in_executor,執行完畢就會回傳一個 asyncio.Future object - 當是一個 asyncio.Future object 想當然爾就可以 await 了 p.s 上面的 futures 和 event_loop 是不同 thread 所以不需要排進 loop 內,直接用 run_in_executor 去跑 multi-thread --- ## 接著用這個想法做個範例 ---- ### 建立一個會 blocking 的函式 ```python= from serial import Serial from concurrent import futures import asyncio import time async def run_blocking_tasks(executor): s = Serial('/dev/ttys005', baudrate=9600) msgs = [b'foo\n', b'bar\n', b'baz\n', b'qux\n', b'DONE\n'] loop = asyncio.get_event_loop() f = [ loop.run_in_executor(executor, send, s, m) for m in msgs ] await asyncio.wait(f) ``` ---- ### 傳送訊息的 func ```python= def send(ser, msg): ser.write(msg) print(f'sending: {msg}') if msg.rstrip().decode() == 'foo': time.sleep(3) print(f'{msg.rstrip().decode()} sleep 3s') ``` ---- ### 建立一個 ThreadPoolExecutor 然後用 asyncio.run() 把 event loop 掛起來 ```python= if __name__ == '__main__': executor = futures.ThreadPoolExecutor(max_workers=1) asyncio.run(run_blocking_tasks(executor)) ``` ---- ### 執行結果 會發現原本 sleep 並不會讓程式阻塞住 ``` MainThread run_blocking_tasks: starting MainThread run_blocking_tasks: creating executor tasks ThreadPoolExecutor-0_0 sending: b'foo\n': running ThreadPoolExecutor-0_1 sending: b'bar\n': running ThreadPoolExecutor-0_2 sending: b'baz\n': running ThreadPoolExecutor-0_1 sending: b'qux\n': running MainThread run_blocking_tasks: waiting for executor tasks ThreadPoolExecutor-0_2 sending: b'DONE\n': running ThreadPoolExecutor-0_0 sending: b'foo\n': foo finish sleep 3s ---> “最後才完成” MainThread run_blocking_tasks: exiting ``` --- ### Sync -> Async, wrap 起來呢? ```python= @sync_to_async def get(ser): msg = ser.read_until(b'\r\n') return msg.rstrip().decode('utf-8') ``` 經過 @sync_to_async wrap 後變成一個 coroutine, 而且會是一個 await > <coroutine object SyncToAsync.__call__ at 0x1091ab1c8> >> __main__:2: RuntimeWarning: coroutine 'SyncToAsync.__call__' was never awaited RuntimeWarning: Enable tracemalloc to get the object allocation traceback ---- ### 裝飾器的說明 ```python= class SyncToAsync: def __init__(self): .... async def __call__(self, *args, **kwargs): loop = asyncio.get_event_loop() future = loop.run_in_executor( None, functools.partial(self.thread_handler, loop, *args, **kwargs), ) return await asyncio.wait_for(future, timeout=None) sync_to_async = SyncToAsync ``` ---- ### 最後可以 await 就 4 爽 ```python= msg = await get(ser) #--> await ``` ```python= import asyncio from serial import Serial async def produce(queue, url, **kwargs): ser = Serial(url, baudrate=9600) while True: msg = await get(ser) #--> await print(f"produce id: {id(msg)}, device:{msg.split(',')[2:4]}, msg:{repr(msg)}") await queue.put(msg) await asyncio.sleep(random.random()) ``` --- ### 大家都有同樣想法:side project 時間! - [awesome asyncio](https://github.com/timofurrer/awesome-asyncio) - [aioserial](https://github.com/changyuheng/aioserial) ---- ```python= import aioserial import asyncio async def read_and_print(aioserial_instance: aioserial.AioSerial): while True: print((await aioserial_instance.read_async()).decode(errors='ignore'), end='', flush=True) asyncio.run(read_and_print(aioserial.AioSerial(port='COM1'))) ``` --- ### Thank you! :sheep: You can find me on - [GitHub](https://github.com/chairco) - [Twitter](https://twitter.com/home?lang=zh-tw) - or [email](chairco@gmail.com) me
{"metaMigratedAt":"2023-06-15T00:02:06.774Z","metaMigratedFrom":"YAML","title":"async def Serial() await Serial_connection","breaks":true,"description":"View the slide with \"Slide Mode\".","slideOptions":"{\"theme\":\"solarized\",\"transition\":\"fade\",\"allottedMinutes\":28,\"spotlight\":{\"enabled\":false}}","contributors":"[{\"id\":\"52fcffbd-98fb-4986-a324-085d370c6514\",\"add\":28767,\"del\":7672}]"}
    3983 views