https://github.com/fin-py/guideline
TBD
https://docs.google.com/forms/d/e/1FAIpQLSd9oVlrCMHEuD3PN0x3QcMgeQGy6Sj90d6uP1CQXQnArX9YqQ/viewform
fin-pyコードリーディング会#4
終了
重要
Zoomに参加したら、出欠確認にチェックを入れてください
発表者のかたは、コードを読んだ際のメモ書きなどを下記に記載してください
ほかの形式でまとめたかたは要点(もし可能であれば資料のリンク)を記載してください
DataStore を直接インスタンス化して使うことを意図していないなら abc を使って抽象基底クラスにすると、利用者に継承して使うものであることを設計者の意図として伝えられる。
from abc import ABC
class DataStore(ABC):
...
抽象基底クラスは、設計者の意図を伝えられるというメリットだけでなく、例えば @abstractmethod
でサブクラスのモデルそれぞれで実装が必要なメソッドを明示して、未実装の場合は、インスタンス化したときにエラーにしたりできる。テストとも相性がよいはず。
動作確認。
>>> from abc import *
>>> class A(ABC):
... @abstractmethod
... def myfunc(self):
... pass
...
>>>
>>> class B(A): pass
...
>>> b = B()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: Can't instantiate abstract class B with abstract methods myfunc
クラス変数の _KEYS
からインスタンス変数を初期化するのはあまり意義があるようにはみえないかな。明示的に __init__
で keys を渡すか、_init
で渡すようなやり方の方がいいのでは? self._KEYS
を直接参照しているコードもなさそうだし。
_KEYS = []
...
self._keys: Tuple[str, ...] = tuple(keys if keys else self._KEYS)
iter
いるのかな?と思ったけど、これは型チェックのために必要なのかな?
def __iter__(self) -> Iterator[Item]:
return iter(self._data.values())
FAILED tests/test_store.py::test__iter__ - TypeError: iter() returned non-iterator of type 'dict_values'
一般論として例外を無視するコードはよくない。
どうしても仕方ないときは logging.debug などを使って、通常 (infoレベル) はログ出力しないけど、調査したいときに切り替えられるようにしておくとまだよいとは思う。
try:
keyitem = {k: item[k] for k in self._keys}
except KeyError:
pass
else:
...
wait を呼び出したときに await event.wait()
でコルーチンが切り替わっているときに ret が変更されることを期待しているコードにみえる。イベントの登録と値の取得は queue のようなものでやった方が保守しやすいコードになるような気がする。
少なくともこのコードだけ読んだら ret は空リストが返るようにしか読めない。
async def wait(self) -> List[Item]:
event = asyncio.Event()
ret = []
self._events[event] = ret
await event.wait()
del self._events[event]
return ret
これも一般論として Manager
という名前は役割が曖昧になるのであまり使われない名前になりつつある気がしている。このクラスは DataStore と asyncio.Event の扱いがセットになっていてその役割の曖昧さが Manager
という名前に現れている気がする。DataStore からみたらコレクションにみえるし、Event からみたらイベントハンドラーにみえる。リファクタリングするなら2つに分割して、それぞれの責務に限定した方が保守はしやすくなる気がする。
これはどこで使っている?
self._iscorofunc = asyncio.iscoroutinefunction(self._onmessage)
この処理は同じにみえるので Pythonic な概念だと、どちらかのやり方に統一した方がよいのではないか?
def __getitem__(self, name: str) -> Optional['DataStore']:
return self._stores[name]
def get(self, name: str, type: Type[TDataStore]) -> TDataStore:
return cast(type, self._stores.get(name))
動作確認。
>>> class A:
... def __init__(self):
... self.data = {}
... def __getitem__(self, name):
... return self.data[name]
... def get(self, name):
... return self.data.get(name)
...
>>> a = A()
>>> a.data['x'] = 1
>>> a['x']
1
>>> a.get('x')
1
def __init__(self, keys: List[str] = [], data: List[Item] = []) -> None:
self._data: Dict[uuid.UUID, Item] = {}
self._index: Dict[int, uuid.UUID] = {}
self._keys: Tuple[str, ...] = tuple(keys if keys else self._KEYS)
self._events: Dict[asyncio.Event, List[Item]] = {}
self._insert(data)
list[str]
のように書けるfrom __future__ import annotations
でジェネリクスが使える...
と書けるnumpyの例
>>> import numpy as np
>>>
>>> data = np.arange(16).reshape((2, 2, 2, 2))
>>> data[0, :, :, 1]
array([[1, 3],
[5, 7]])
...
で省略できる
>>> data[0, ..., 1]
array([[1, 3],
[5, 7]])
def _insert(self, data: List[Item]) -> None:
if self._keys:
for item in data:
try:
keyitem = {k: item[k] for k in self._keys}
except KeyError:
pass
_insert
_update
_delete
_sweep_with_key
_sweep_without_key
get
pop
keyitem
はコンストラクタに入れてインスタンス属性にはできない?
keys = [next(_iter) for _ in range(over)]
for k in keys:
del self._data[self._index[k]]
del self._index[k]
for _ in range(over):
k = next(_iter)
del self._data[self._index[k]]
del self._index[k]
>>> import itertools
>>>
>>> di = {k: v for k, v in zip(range(5), "abcde")}
>>> di
{0: 'a', 1: 'b', 2: 'c', 3: 'd', 4: 'e'}
>>> over = 3
>>> dict(islice(di.items(), over, len(di)))
>>> {3: 'd', 4: 'e'}
_sweep_with_key
と _sweep_without_key
は同様な処理なので、1つのメソッドにまとめられそう
def _pop(self, item: Item) -> Optional[Item]:
if self._keys:
try:
keyitem = {k: item[k] for k in self._keys}
except KeyError:
pass
else:
keyhash = self._hash(keyitem)
if keyhash in self._index:
ret = self._data[self._index[keyhash]]
del self._data[self._index[keyhash]]
del self._index[keyhash]
return ret
_pop
で、処理もpopに近いので、 del
よりは pop
をにしたほうがわかりやすそう
def find(self, query: Item = {}) -> List[Item]:
if query:
return [
item
for item in self
if all(k in item and query[k] == item[k] for k in query)
]
else:
return list(self)
async def wait(self) -> List[Item]:
event = asyncio.Event()
ret = []
self._events[event] = ret
await event.wait()
del self._events[event]
return ret
bound
で DataStore
クラスが親クラスであることを指定models
の親クラス__getitem__
と __contains__
が実装されているので、コンテナ型として扱えるスクリプト
import asyncio
import pybotters
from logzero import logger
# apis
apis = {"ftx": ["", ""]}
async def main(market):
async with pybotters.Client(apis=apis) as client:
# DataStoreManagerインスタンス化
store = pybotters.FTXDataStore()
wstask = await client.ws_connect(
"wss://ftx.com/ws/",
send_json=[
{"op": "subscribe", "channel": "orderbook", "market": market},
# {"op": "subscribe", "channel": "trades", "market": market},
# {"op": "subscribe", "channel": "ticker", "market": market},
],
# ウェブソケットデータを受信したら、onmessage関数で処理するように設定
hdlr_json=store.onmessage,
)
while True:
await store.wait()
# https://docs.ftx.com/#orderbooks
# デフォルトでBest100が流れてくるので今回は見やすさ重視で1つだけ表示
# debug, warn, info には意味はなく、ただログ出力時の見やすさ(ログ色分け)のために使用
logger.debug(store.orderbook.find({"side": "buy"})[:1])
logger.warn(store.orderbook.find({"side": "sell"})[:1])
logger.info(store.ticker.find())
# logger.info(list(store.orderbook))
# 非同期メイン関数を実行(Ctrl+Cで終了)
if __name__ == "__main__":
market = "FTT-PERP"
try:
asyncio.run(main(market))
except KeyboardInterrupt:
pass
出力
[D 211012 18:36:46 test:27] [{'market': 'FTT-PERP', 'side': 'buy', 'price': 53.2035, 'size': 14.3}]
[W 211012 18:36:46 test:28] [{'market': 'FTT-PERP', 'side': 'sell', 'price': 53.2155, 'size': 58.0}]
[I 211012 18:36:46 test:29] [{'market': 'FTT-PERP', 'bid': 53.2035, 'ask': 53.2155, 'bidSize': 14.3, 'askSize': 58.0, 'last': 53.215, 'time': 1634031406.137918}]
読みたいところ:
[[best price, size], [next best price, size]... ]
{'market': 'FTT-PERP', 'side': 'buy', 'price': 53.2035, 'size': 14.3}
)に変換しているところを読むhdlr_json=store.onmessage
して、ウェブソケットデータを受信したら、onmessage関数で処理するように設定してあるのでpybotters.FTXDataStore()
を読むmodels/ftx.py
15行目
class FTXDataStore(DataStoreManager):
def _init(self) -> None:
self.create('ticker', datastore_class=Ticker)
self.create('markets', datastore_class=Markets)
self.create('trades', datastore_class=Trades)
self.create('orderbook', datastore_class=OrderBook)
:
:
FTXDataStore()
をインスタンス化する時に、create
メソッドを使って、チャネル名ごとに datastore_class
を指定datastore_class
は OrderBook
createメソッド: store.py
208行目
def create(
self,
name: str,
*,
keys: List[str] = [],
data: List[Item] = [],
datastore_class: Type[DataStore] = DataStore,
) -> None:
self._stores[name] = datastore_class(keys, data)
self._stores
に、データストアオブジェクトを格納OrderBook データストアクラス: models/ftx.py
117行目
class OrderBook(DataStore):
_KEYS = ['market', 'side', 'price']
_BDSIDE = {'sell': 'asks', 'buy': 'bids'}
def sorted(self, query: Item = {}) -> Dict[str, List[float]]:
result = {'asks': [], 'bids': []}
for item in self:
if all(k in item and query[k] == item[k] for k in query):
result[self._BDSIDE[item['side']]].append([item['price'], item['size']])
result['asks'].sort(key=lambda x: x[0])
result['bids'].sort(key=lambda x: x[0], reverse=True)
return result
def _onmessage(self, market: str, data: List[Item]) -> None:
if data['action'] == 'partial':
result = self.find({'market': market})
self._delete(result)
for boardside, side in (('bids', 'buy'), ('asks', 'sell')):
for item in data[boardside]:
if item[1]:
self._update(
[
{
'market': market,
'side': side,
'price': item[0],
'size': item[1],
}
]
)
else:
self._delete([{'market': market, 'side': side, 'price': item[0]}])
{'time': 1634088862.3278034,
'checksum': 1908033768,
'bids': [[53.5175, 19.0], [53.5055, 6.4], ...], # [price, size] で、Best 100 の 板情報が 入る
'asks': [[53.524, 7.7], [53.5245, 43.2], ...], # 〃
'action': 'partial'}
{'market': 'FTT-PERP', 'side': 'buy', 'price': 53.2035, 'size': 14.3}
に変換されている。sorted
関数は使われてないっぽい)self._update()
: store.py
67行目
def _update(self, data: List[Item]) -> None:
if self._keys:
for item in data:
try:
keyitem = {k: item[k] for k in self._keys}
# {'market': 'FTT-PERP', 'side': 'buy', 'price': 52.9075}
except KeyError:
pass
else:
keyhash = self._hash(keyitem)
if keyhash in self._index:
self._data[self._index[keyhash]].update(item)
else:
_id = uuid.uuid4()
self._data[_id] = item
self._index[keyhash] = _id
self._sweep_with_key()
else:
for item in data:
_id = uuid.uuid4()
self._data[_id] = item
self._sweep_without_key()
# !TODO! This behaviour might be undesirable.
self._set(data)
self._keys
は OrderBook
で定義されている _KEYS = ['market', 'side', 'price']
{'market': 'FTT-PERP', 'side': 'buy', 'price': 52.9075}
という形のデータから keyhash を作成self._data
に、keyhash と 板情報のデータを追加self._data
は
{UUID('ac7be6f4-e2cf-4082-b89e-55ac63ee0387'): {'market': 'FTT-PERP', 'side': 'buy', 'price': 52.733, 'size': 27.0},
UUID('fb27567f-b809-4fa2-8683-0f15eda14e93'): {'market': 'FTT-PERP', 'side': 'buy', 'price': 52.7325, 'size': 1.5},
UUID('c74b5ec0-1014-4387-be7b-a56677a5161c'): {'market': 'FTT-PERP', 'side': 'buy', 'price': 52.7255, 'size': 40.8},.... }
__iter__
で return されているので、ユーザは store.orderbook
でアクセス可store.orderbook.find({"side": "buy"}
: store.py
153行目
def find(self, query: Item = {}) -> List[Item]:
if query:
return [
item
for item in self
if all(k in item and query[k] == item[k] for k in query)
]
else:
return list(self)
__iter__
で取得できるデータ。つまり self._data.values()
{'side': 'buy'}
)にマッチしたデータだけフィルタして返すまとめ:
DataStore
と DataStoreManager
を継承しながら定義してある。(models
配下)class DataStore:
def init(self, keys: List[str] = [], data: List[Item] = []) -> None: 各種インスタンス変数の初期化
def len(self) -> int: データの長さを返却
def len(self) -> int: データのイテレータを返却
@staticmethod
def _hash(item: Dict[str, Hashable]) -> int: itemのハッシュ値を返却
def _insert(self, data: List[Item]) -> None: dataへのデータ挿入
def _update(self, data: List[Item]) -> None: dataへの要素更新
def _delete(self, data: List[Item]) -> None: dataの要素削除
def _clear(self) -> None: data等の削除
def _sweep_with_key(self) -> None: dataがMAXLENより大きいときのkeyによるsweep処理?(_insert()や_update()で利用)
def _sweep_without_key(self) -> None: dataがMAXLENより大きいときのkeyによらないsweep処理?
def get(self, item: Item) -> Optional[Item]:
def _pop(self, item: Item) -> Optional[Item]:
def find(self, query: Item = {}) -> List[Item]:
def _find_and_delete(self, query: Item = {}) -> List[Item]:
def _set(self, data: List[Item] = None) -> None:
async def wait(self) -> List[Item]:
TDataStore = TypeVar('TDataStore', bound=DataStore)
class DataStoreManager:
def init(self) -> None:
def getitem(self, name: str) -> Optional['DataStore']:
def contains(self, name: str) -> bool:
def create( 中略 ) -> None:
def get(self, name: str, type: Type[TDataStore]) -> TDataStore:
def _onmessage(self, msg: Any, ws: ClientWebSocketResponse) -> None:
def onmessage(self, msg: Any, ws: ClientWebSocketResponse) -> None:
def _set(self) -> None:
async def wait(self) -> None:
__init__.py
auth.py
client.py
request.py
store.py
typedefs.py
ws.py
tests
docs
(ドキュメント関連)pyproject.toml
(パッケージ関連)pytest.yml
(CI, GitHub Actions)auth.py
を読む