changed 3 years ago
Linked with GitHub

fin-pyコードリーディング会#3
Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

概要

fin-pyについて

https://github.com/fin-py/guideline

次回のイベント

https://fin-py.connpass.com/event/224843/

Slack

https://docs.google.com/forms/d/e/1FAIpQLSd9oVlrCMHEuD3PN0x3QcMgeQGy6Sj90d6uP1CQXQnArX9YqQ/viewform

参加方法

fin-pyコードリーディング会#3
2021年10月1日 07:00 PM 大阪、札幌、東京

終了

重要

Zoomに参加したら、出欠確認にチェックを入れてください

前回の様子

タイムテーブル
Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

  1. 発表者枠が話す: 各自20-30min
  2. クロージング(コードを読んだ感想など): 20min

コードリーディング

読むコード

pybotters.ws.py

発表者のかたは、コードを読んだ際のメモ書きなどを下記に記載してください
ほかの形式でまとめたかたは要点(もし可能であれば資料のリンク)を記載してください

参考情報

自己紹介・コメント・メモ

  • にぎやかし枠、発表者枠のかたは下記に自己紹介を記入してください
    • pybottersを使っているかなどの情報があると嬉しいです
  • イベントのフィードバックや感想などを追記していただけると助かります

どりらん

  • Twitter: patraqushe
  • オプションをトレードしてます
  • 感想

しんせいたろう

  • Twitter: shinseitaro
  • 米国株とか売ったり買ったりしてるひと。pythonは長く書いてるわりに分かってないひと。クラスとかいまだ苦手
  • 感想

やろころ

  • FX、米国株、オプションをちょっとやってます
  • 感想

KanSAKAMOTO

  • にぎやかし枠に当選しました。前回のAIOHTTPハンズオンからの参加です。そのハンズオンに参加するのにWindows10ではバグがあったため入るのに苦労して遅刻してしまいましたが、今回の会はすんなり入れてホッとしています

MeshiKutteNeru

  • にぎやかし枠参加です。金融系のデータ分析やさん見習いをしています。

発表者

しんせいたろう

初めて使う標準ライブラリ

from dataclasses import dataclass
  • dataclass
  • dataclasses
    • このモジュールは、__init__()__repr__() のような special method を生成し、ユーザー定義のクラスに自動的に追加するデコレータや関数を提供します。
    • special method(特殊メソッド): __名前__() の形のメソッドで、Pythonが暗黙的に呼び出すメソッド
  • [詳解] Pythonのdataclasses
    • クラス変数を初期化するための__init__()を自動生成してくれるため、クラスの定義がシンプルに
    • Python3.7から追加
    ​​​​# mytest.py ​​​​from dataclasses import dataclass ​​​​@dataclass ​​​​class Person: ​​​​ firstname: str ​​​​ lastname: str ​​​​ def fullname(self) -> str: ​​​​ return f"{self.lastname.upper()}, {self.firstname.title()}" ​​​​p = Person("taro", "shinsei") ​​​​print(p) ​​​​print(p.fullname()) ​​​​class Person_old: ​​​​ def __init__(self, firstname: str, lastname: str) -> None: ​​​​ self.firstname = firstname ​​​​ self.lastname = lastname ​​​​ def fullname(self) -> str: ​​​​ return f"{self.lastname.upper()}, {self.firstname.title()}" ​​​​p1 = Person_old("taro", "shinsei") ​​​​print(p1) ​​​​print(p1.fullname())
    ​​​​❯ python mytest.py ​​​​Person(firstname='taro', lastname='shinsei') ​​​​SHINSEI, Taro ​​​​<__main__.Person_old object at 0x7f90346b1690> ​​​​SHINSEI, Taro
    • @dataclass を使うと
      1. __init__ を使わなくても良い(コードがスッキリ)
      2. namedtuple で値が返る
        • named tuple 使ってもいいけど、メソッドとか一緒に作れるから便利
      3. frozen=True を渡すと immutable に出来る
      ​​​​​​​​@dataclass(frozen=True)
      ​​​​​​​​class Person:
      ​​​​​​​​    firstname: str
      ​​​​​​​​    lastname: str
      
      ​​​​​​​​    def fullname(self) -> str: 
      ​​​​​​​​        return f"{self.lastname.upper()}, {self.firstname.title()}"
      
      ​​​​​​​​p = Person("taro", "shinsei")
      ​​​​​​​​p.firstname="jiro"
      
      ​​​​​​​​❯ python mytest.py ​​​​​​​​Traceback (most recent call last): ​​​​​​​​ File "mytest2.py", line 12, in <module> ​​​​​​​​ p.firstname="jiro" ​​​​​​​​ File "<string>", line 3, in __setattr__ ​​​​​​​​dataclasses.FrozenInstanceError: cannot assign to field 'firstname'
from secrets import token_hex
  • ws.py 137行目 bitflyer の AUTHのための nonce と signiture のために使われている
  • secrets - 機密を扱うために安全な乱数を生成する
  • secrets.token_hex([nbytes=None])
    • 十六進数のランダムなテキスト文字列を返します。文字列は nbytes のランダムなバイトを持ち、各バイトは二つの十六進数に変換されます。nbytes が None の場合や与えられなかった場合は妥当なデフォルト値が使われます。
    ​​​​>>> token_hex(16)  
    ​​​​'f9bf78b9a18ce6d46a0cd2b0b86df9da'    
    

初めて使うサードパーティライブラリ

# ws.py 13行目
from aiohttp.http_websocket import json
from aiohttp.typedefs import StrOrURL
# aiohttp/typedefs.py
StrOrURL = Union[str, URL]

コードリーディング

  • ws_run_forever を中心に

  • ws_run_forever が使われているところ

    • client.pyClient クラスの ws_connect メソッド
    • 最終的にはユーザが各取引所のWebSocket API を利用するために ws_connect を呼び出す
    • この時、Clientws_response_class=ClientWebSocketResponse,
    ​​​​# pybotters/client.py 31行目〜
    ​​​​class Client:
    ​​​​    _session: aiohttp.ClientSession
    ​​​​    _base_url: str
    
    ​​​​    def __init__(
    ​​​​        self,
    ​​​​        apis: Union[Dict[str, List[str]], str] = {},
    ​​​​        base_url: str = '',
    ​​​​        **kwargs: Any,
    ​​​​    ) -> None:
    ​​​​        self._session = aiohttp.ClientSession(
    ​​​​            request_class=ClientRequest,
    ​​​​            ws_response_class=ClientWebSocketResponse, # ←ここ
    ​​​​            **kwargs,
    ​​​​        )
    ​​​​        apis = self._load_apis(apis)
    ​​​​        self._session.__dict__['_apis'] = self._encode_apis(apis)
    ​​​​        self._base_url = base_url
    
    ​​​​# pybotters/client.py 117行目〜
    ​​​​    async def ws_connect(
    ​​​​        self,
    ​​​​        url: str,
    ​​​​        *,
    ​​​​        send_str: Optional[Union[str, List[str]]] = None,
    ​​​​        send_json: Any = None,
    ​​​​        hdlr_str: Optional[WsStrHandler] = None,
    ​​​​        hdlr_json: Optional[WsJsonHandler] = None,
    ​​​​        **kwargs: Any,
    ​​​​    ) -> asyncio.Task:
    ​​​​        event = asyncio.Event()
    ​​​​        task = asyncio.create_task(
    ​​​​            ws_run_forever( # ← ここ
    ​​​​                url,
    ​​​​                self._session,
    ​​​​                event,
    ​​​​                send_str=send_str,
    ​​​​                send_json=send_json,
    ​​​​                hdlr_str=hdlr_str,
    ​​​​                hdlr_json=hdlr_json,
    ​​​​                **kwargs,
    ​​​​            )
    ​​​​        )
    ​​​​        await event.wait()
    ​​​​        return task
    
    ​​​​# ユーザスクリプト
    ​​​​async def main():
    ​​​​    async with pybotters.Client(apis=apis) as client:
    ​​​​        wstask = await client.ws_connect( # ← ここ
    ​​​​            'wss://...',
    ​​​​            send_json={'foo': 'bar'},
    ​​​​            hdlr_json=pybotters.print_handler,
    ​​​​            # OR string
    ​​​​            # send_str='{"foo":"bar"}',
    ​​​​            # hdlr_str=pybotters.print_handler,
    ​​​​            # OR Multiple request
    ​​​​            # send_json=[{'foo': 'bar'}, {'baz': 'foobar'}],
    ​​​​            # send_str=['{"foo": "bar"}', '{"baz": "foobar"}'],
    ​​​​        )
    ​​​​        await wstask
    
    
# ws.py 23 行目〜
async def ws_run_forever(
    url: StrOrURL,
    session: aiohttp.ClientSession,
    event: asyncio.Event,
    *,
    send_str: Optional[Union[str, List[str]]] = None,
    send_json: Any = None,
    hdlr_str=None, # WsStrHandler
    hdlr_json=None, # WsJsonHandler
    auth=_Auth,
    **kwargs: Any,
) -> None:
  • WsJsonHandler
    ​​​​# pybotters/typedefs.py
    ​​​​WsStrHandler = Callable[
    ​​​​    [str, ClientWebSocketResponse], Optional[Coroutine[Any, Any, None]]
    ​​​​]
    ​​​​WsJsonHandler = Callable[
    ​​​​    [Any, ClientWebSocketResponse], Optional[Coroutine[Any, Any, None]]
    ​​​​]    
    
    • typing.Callable
    • Callable[[引数の型], [返り値の型]]
    • Optional[]: None もしくは指定の型
    • WsStrHandler は 関数(Callable)で、引数は strClientWebSocketResponseで、返り値は、None もしくは Coroutineオブジェクト
    • [Coroutine[Any, Any, None]: typing.Coroutineに説明があるけど、サッパリわからない
# ws.py 35 行目〜
    if all([hdlr_str is None, hdlr_json is None]):
        hdlr_json = pybotters.print_handler

    iscorofunc_str = asyncio.iscoroutinefunction(hdlr_str)
    iscorofunc_json = asyncio.iscoroutinefunction(hdlr_json)
  • pybotters の wiki
    の説明にある「受信したメッセージを処理するハンドラ関数の指定」の
    実装部分
  • asyncio.iscoroutinefunction
  • iscoroutinefunction(func): func がコルーチン関数であれば True
# ws.py 39 行目〜
    while not session.closed:
        cooldown = asyncio.create_task(asyncio.sleep(60.0))
  • 以前まちゅけんさんが仰ってた、何かしらの理由でsessionが閉じた時の処理
  • sleep させるタスクを作っておいて、sessionが閉じると except 文に移り、最終的には 85行目の await cooldown
# ws.py 41 行目〜
        try:
            async with session.ws_connect(url, auth=auth, **kwargs) as ws:
                event.set()
  • event.set() の event は ws_connect を初期化した時に生成される asyncio.Event()
  • (くわしくはどりらんさんへ)
  • Eventオブジェクト
    • 複数の task に event が発生したことを通知出来るオブジェクト
    • .set() メソッドで True フラグ、
    • .clear() メソッドで False を内部的にたてて
    • .wait() メソッドはTrueフラグがたつまで待たせる
  • client.pyws_connect で、await event.wait() しているのでTrueになるまでまってる状態
  • よって、ここのevent.set()でTrueを立てて処理開始
# ws.py 43 行目〜                
                if '_authtask' in ws.__dict__:
                    await ws.__dict__['_authtask']
  • 認証処理
  • '_authtask' は、ClientWebSocketResponse で定義
# ws.py 291 行目〜
class ClientWebSocketResponse(aiohttp.ClientWebSocketResponse):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        if self._response.url.host in HeartbeatHosts.items:
            self.__dict__['_pingtask'] = asyncio.create_task(
                HeartbeatHosts.items[self._response.url.host](self)
            )
        if self._response.__dict__['_auth'] is _Auth:
            if self._response.url.host in AuthHosts.items:
                if (
                    AuthHosts.items[self._response.url.host].name
                    in self._response._session.__dict__['_apis']
                ):
                    self.__dict__['_authtask'] = asyncio.create_task(
                        AuthHosts.items[self._response.url.host].func(self)
                    )
        self._lock = asyncio.Lock()
  • AuthHosts
    ​​​​# ws.py 254 行目〜
    ​​​​@dataclass
    ​​​​class Item:
    ​​​​    name: str
    ​​​​    func: Any
    
    ​​​​# ws.py 281 行目〜
    ​​​​class AuthHosts:
    ​​​​    items = {
    ​​​​        'ws.lightstream.bitflyer.com': Item('bitflyer', Auth.bitflyer),
    ​​​​        'tap.liquid.com': Item('liquid', Auth.liquid),
    ​​​​        'ftx.com': Item('ftx', Auth.ftx),
    ​​​​        'phemex.com': Item('phemex', Auth.phemex),
    ​​​​        'testnet.phemex.com': Item('phemex_testnet', Auth.phemex),
    ​​​​    }
    
    • 取引所のホスト名をキー
    • 取引所名と認証関数をデータの namedtuple を値
    • として持つ辞書
    • (脱線ですが、pybotters/typedefs.pyItem = Dict[str, Any] という定義がありました。ここではdataclass を使って定義したのはどういう意図でしょうか?)
  • self.__dict__['_authtask'] に、この辞書を使って各取引所認証メソッドを格納
  • await ws.__dict__['_authtask'] で待機させている
  • self._lock = asyncio.Lock()
    • asyncio.Lock
    • mutex lock: 排他ロック(?)
    • heartbeat の処理と auth の処理はちゃんと終わるまで他が割り込まないようにしてる?
# ws.py 46 行目〜
                if send_str is not None:
                    if isinstance(send_str, list):
                        await asyncio.gather(*[ws.send_str(item) for item in send_str])
                    else:
                        await ws.send_str(send_str)
                if send_json is not None:
                    if isinstance(send_json, list):
                        await asyncio.gather(
                            *[ws.send_json(item) for item in send_json]
                        )
                    else:
                        await ws.send_json(send_json)
  • サブスクライブしたいデータを、文字列もしくはJSONで登録処理して待機させる
# ws.py 58 行目〜                        
                async for msg in ws:
                    if msg.type == aiohttp.WSMsgType.TEXT:
                        if hdlr_str is not None:
                            try:
                                if iscorofunc_str:
                                    await hdlr_str(msg.data, ws)
                                else:
                                    hdlr_str(msg.data, ws)
                            except Exception as e:
                                logger.error(repr(e))
  • ウェブソケットで受け取ったデータの処理。ハンドラ関数で切り分け。
  • hdlr_str は None もしくは WsStrHandler(Coroutine) なので、else 文にはどういう時に入る?
                        if hdlr_json is not None:
                            try:
                                data = msg.json()
                            except json.decoder.JSONDecodeError:
                                pass
                            else:
                                try:
                                    if iscorofunc_json:
                                        await hdlr_json(data, ws)
                                    else:
                                        hdlr_json(data, ws)
                                except Exception as e:
                                    logger.error(repr(e))
                    elif msg.type == aiohttp.WSMsgType.ERROR:
                        break
  • 上記の json 版
# ws.py 85 行目〜                     
        except (aiohttp.WSServerHandshakeError, aiohttp.ClientOSError) as e:
            logger.warning(repr(e))
        await cooldown

どりらん

WebSocket API (WebSockets)

https://developer.mozilla.org/ja/docs/Web/API/WebSockets_API

  • ユーザのブラウザとサーバ間で対話的な通信セッションを開くことができる技術
  • サーバにメッセージを送信したり、応答をサーバにポーリングすることなく、イベント駆動型のレスポンスを受信したりすることができる
インターフェイス
WebSocket
WebSocket サーバに接続し、その接続を通じてデータを送受信するための主要インターフェイス
CloseEvent
接続が閉じた時に WebSocket オブジェクトによって送信されるイベント
MessageEvent
サーバからメッセージを受信した時に WebSocket オブジェクトによって送信されるイベント

WebSocketに対応しているPythonパッケージ

HTTPXはWebSocketに対応していない

https://github.com/encode/httpx/issues/304

asyncio.Event

https://github.com/MtkN1/pybotters/blob/f90a05b151b483dd2a8ccd110a4a09896eef3d67/pybotters/ws.py#L26

https://docs.python.org/ja/3/library/asyncio-sync.html#asyncio.Event

  • スレッドセーフではないイベント
  • 何らかのイベントが発生したことを複数のTaskに通知できる
  • 内部フラグで管理
    • setメソッド -> True
    • clearメソッド -> False
  • フラグがTrueになるまでブロックされる(イニシャライズ時はFalse)
wait
  • イベントがsetされるまて待つ(ブロックする)
  • イベントがsetされている場合はTrueを返す
set
  • イベントをsetする
  • 待機されたTaskが起動する
clear
  • イベントをunsetする
  • 待機された(waitした)Taskは、再度setが呼ばれるまでブロックされる
is_set
イベントがsetされていた場合にTrueを返す

サンプルコード: フラグがTrueのときだけsleeper_taskが再開される

import asyncio import random async def sleeper(event): print("寝てます") await event.wait() print(f"{event.is_set()=}") print("起きました") await asyncio.sleep(3) print("歯磨きしました") async def dancer(): for _ in range(3): print("踊ってます") await asyncio.sleep(1) async def main(): event = asyncio.Event() sleeper_task = asyncio.create_task(sleeper(event)) dancer_task = asyncio.create_task(dancer()) print(f"{event.is_set()=}") if random.choice((True, False)): event.set() await sleeper_task await dancer_task asyncio.run(main())

asyncio.iscoroutinefunction

https://github.com/MtkN1/pybotters/blob/f90a05b151b483dd2a8ccd110a4a09896eef3d67/pybotters/ws.py#L37

https://docs.python.org/ja/3/library/asyncio-task.html#asyncio.iscoroutinefunction

import asyncio import inspect async def async_func(): return @asyncio.coroutine def generator_func(): yield async def main(): print(f"{asyncio.iscoroutinefunction(async_func)=}") print(f"{asyncio.iscoroutinefunction(generator_func)=}") print(f"{inspect.iscoroutinefunction(async_func)=}") print(f"{inspect.iscoroutinefunction(generator_func)=}") asyncio.run(main())
DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
  def generator_func():
asyncio.iscoroutinefunction(async_func)=True
asyncio.iscoroutinefunction(generator_func)=True
inspect.iscoroutinefunction(async_func)=True
inspect.iscoroutinefunction(generator_func)=False

インスタンスがコルーチンであるかを確認する関数

asyncio.iscoroutine
ジェネレータベースのコルーチンもTrueを返す
inspect.iscoroutine
async defで定義されたコルーチンのみTrueを返す

asyncio.Lock

https://github.com/MtkN1/pybotters/blob/f90a05b151b483dd2a8ccd110a4a09896eef3d67/pybotters/ws.py#L307

https://docs.python.org/ja/3/library/asyncio-sync.html#asyncio.Lock

  • asyncioタスクのミューテックスロックを実装
  • スレッドセーフではない
  • 非同期ロックを使用して、共有リソースへの排他的アクセスを保証できる

コードサンプル:

  • 1回目にGETリクエストした結果はキャッシュ(メモ化)する
  • 2回目以降はキャッシュを参照し、GETリクエストしない
  • タスクが同時に起動しても、ロックされているので、GETリクエストが1回だけになる
  • ロックされていないタスクは非同期に実行される
import aiohttp import asyncio import functools @functools.cache async def fetch(): url = "https://httpbin.org/delay/3" async with aiohttp.ClientSession() as client, client.get(url) as resp: assert resp.status == 200 return await resp.text() async def main(): async with asyncio.Lock(): return await asyncio.gather(fetch(), fetch()) await asyncio.sleep(3) asyncio.run(main())

https://github.com/MtkN1/pybotters/blob/f90a05b151b483dd2a8ccd110a4a09896eef3d67/pybotters/ws.py#L319

RequestLimit.gmocoinでロックをしているのは、GMOコインが多重アクセスを禁止しているからでしょうか?

aiohttp.ClientSession.ws_connect

https://github.com/MtkN1/pybotters/blob/f90a05b151b483dd2a8ccd110a4a09896eef3d67/pybotters/ws.py#L42

https://docs.aiohttp.org/en/stable/client_reference.html?highlight=ws_connect#aiohttp.ClientSession.ws_connect

  • Create a websocket connection. Returns a ClientWebSocketResponse object.
  • WebSocketコネクションを作成し、ClientWebSocketResponseオブジェクトを返す

ClientWebSocketResponse

https://docs.aiohttp.org/en/stable/client_reference.html?highlight=ClientWebSocketResponse#aiohttp.ClientWebSocketResponse

receive
  • A coroutine that waits upcoming data message from peer and returns it.
  • The coroutine implicitly handles PING, PONG and CLOSE without returning the message.
  • It process ping-pong game and performs closing handshake internally.
closed
Read-only property, True if close() has been called or CLOSE message has been received from peer.
send_str
Send data to peer as TEXT message.
pong
Send PONG to peer.
send_json
Send data to peer as JSON string.

aiohttp.WSMessage

https://docs.aiohttp.org/en/stable/websocket_utilities.html?highlight=WSMsgType#aiohttp.WSMessage

  • receive()が呼ばれたときのWebSocketメッセージ

aiohttp.WSMsgType

https://github.com/MtkN1/pybotters/blob/f90a05b151b483dd2a8ccd110a4a09896eef3d67/pybotters/ws.py#L59

https://docs.aiohttp.org/en/stable/websocket_utilities.html?highlight=WSMsgType#aiohttp.WSMsgType

気になった点

https://github.com/MtkN1/pybotters/blob/f90a05b151b483dd2a8ccd110a4a09896eef3d67/pybotters/ws.py#L13

http_websocket.pyでは標準ライブラリのjsonをimportしているので、import jsonでよいのではないかと思いました

https://github.com/MtkN1/pybotters/blob/f90a05b151b483dd2a8ccd110a4a09896eef3d67/pybotters/ws.py#L18

from .auth import Auth as _Auth

同じモジュールのAuthクラスとの衝突を避けるなら、from . import authとして、auth.Authで呼び出せばよいのではないかと思いました

https://github.com/MtkN1/pybotters/blob/f90a05b151b483dd2a8ccd110a4a09896eef3d67/pybotters/ws.py#L30

hdlr_strhdlr_jsonCallableの型ヒントつけられないかなと思いました

https://github.com/MtkN1/pybotters/blob/f90a05b151b483dd2a8ccd110a4a09896eef3d67/pybotters/ws.py#L44

hasattr(ws, "_authtask")でよいのではないかと思いました


やろころ

読み方

  • 細かい文法は無視
  • 概要を把握

概要

  • import
  • logger取得
  • async def ws_run_forever() -> None # client.pyのWebSocket接続時-ws_conncet()-に実行する無限ループ
  • class Heartbeat # 各取引所毎: pingを打って、sleepする
    • async def bybit()
    • async def bitbank()
    • async def liquid()
    • async def ftx()
    • async def binance()
    • async def phemex()
  • class Auth # 各取引所毎: Authを行うリクエストを送信し、レスポンスを解析(OK/ERROR)
    • async def bitflyer()
    • async def liquid()
    • async def ftx()
    • async def phemex()
  • class Item
  • class HeartbeatHosts # ホスト名からHearbeat staticmethodを引く
    • items
  • class AuthHosts # ホスト名からAuth staticmethodを引く
    • items
  • class ClientWebSocketResponse()
    • def __init__() -> None # レスポンスがHeartbeatかAuthに応じて、タスクを生成
    • async def send_str() -> None # RequestLimitHostを考慮しつつ、データ送信
  • class RequestLimit # データ送信で考慮が必要な取引所
    • sync def gmocoin() # GMO向け実装
      class RequestLimitHosts # ホスト名からRequestLimitの staticmethodを引く
    • items

出席確認

  • shinseitaro
  • driller
  • MtkN1
  • saru999
  • KanSAKAMOTO
  • takei_y
  • keen-06
  • k_kishi
  • f01415272
  • ToshikiUrasaki
  • yoghurppe
  • nrhide
  • MeshiKutteNeru
  • yarokoro

次回

  • __init__.py
  • auth.py
  • client.py
  • request.py
  • store.py
  • typedefs.py
  • ws.py
  • tests
  • docs (ドキュメント関連)
  • pyproject.toml (パッケージ関連)
  • pytest.yml (CI, GitHub Actions)

次回はstore.py

10/13(水) 19:00-21:00

Select a repo