最近因為工作的關係,看了一些 parser combinator library 實作。
這次從 TypeScript 開始,解釋 parser combinator library 是怎麼構造的,再看看 OCaml 上的 parser combinator library: Angstrom 。
一個 combinator library 會提供很多這種函數,讓開發者像堆積木那樣,用小零件組出大零件,來完成工作。 parser combinators 是組合 parser 的工具。
我們的目標是處理一個沒有雙引號 "
的 CSV :
Death Stranding,1790,Kojima Productions Grand Theft Auto V,1299,Rockstart North Valheim,318,Iron Gate AB
如果我們只關心輸入和輸出,那一個 parser 可以看成一個「吃字串、吐出結果和剩下的字串」的函數。
用 TypeScript 描述起來長這樣:
type Option<T> = T | undefined; type Parser<T> = (input: string) => [Option<T>, string];
str
我們可以這樣實作一個函數,幫我們生出「取得某個字串 s
的 parser 」:
const str : (s: string) => Parser<string> = (s) => (input) => { if (input.startsWith(s)) { return [s, input.slice(s.length)]; } return [, input]; };
要是成功看見了字串 s
,就會得到 s
和剩下來的字串。要是沒拿到 s
,則會得到 undefined
和原封不動的 input
。
let result: Option<any>; let rest: string = ''; const foo = str("foo"); [result, rest] = foo("foobar"); console.log(result, rest); // "foo", "bar"
word
& digits
為了要處理 "Death Stranding"
和 1790
這種字串,還得做出可以處理英文字的 parser 跟可以處理數字的 parser 。
先準備一個可以處理特定範圍內的字元的 takeWhile
combinator :
const takeWhile : (f: (x: string) => boolean) => Parser<string> = (f) => (input) => { let i = 0; while(i < input.length) { if (!f(input[i])) break; ++i; } if (i === 0) return [, input]; return [input.slice(0, i), input.slice(i)]; };
接著就能做出處理英文的 word
parser 和處理數字的 digits
parser :
const between : (a: string, b: string) => (c: string) => boolean = (a, b) => (c) => { const code = c.charCodeAt(0); return a.charCodeAt(0) <= code && code <= b.charCodeAt(0); }; const isUpper = between("A", "Z"); const isLower = between("a", "z"); const word = takeWhile((c) => isUpper(c) || isLower(c) ); [result, rest] = word("foobar2000"); console.log(result, rest); // "foobar", "2000" const isDigits = between("0", "9"); const digits = takeWhile(isDigits); [result, rest] = digits("1984DEC10"); console.log(result, rest); // "1984", "DEC10"
seq
為了要處理任意數量的英文字,還需要一個「能串起數個 parser 」的 seq
。
const seq : <T, U>(pa: Parser<T>, pb: Parser<U>) => Parser<[T, U]> = (pa, pb) => (input) => { const [a, rest0] = pa(input); if (a === undefined) return [, input]; const [b, rest1] = pb(rest0); if (b === undefined) return [, input]; return [[a, b], rest1]; };
pure
& bind
實作 seq
時,會注意到,我們常常判斷 parser 跑完後的結果是成功還是失敗。
如果有一組函數,可以幫我們處理這件事,讓 parser 失敗時,自動傳回 [, input]
,其他時候再交給我們處理,那麼組合 parser 的時候就更輕鬆了。
這樣的一組函數通常叫做 pure
和 bind
。
const pure : <T>(x: T) => Parser<T> = (x) => (input) => [x, input]; const bind : <T, U>(pa: Parser<T>, f: (x: T) => Parser<U>) => Parser<U> = (pa, f) => (input) => { const [a, rest] = pa(input); if (a === undefined) return [, input]; const pb = f(a); return pb(rest); };
接著可以用 pure
和 bind
重做看看 seq
:
const seq : <T, U>( pa: Parser<T>, pb: Parser<U> ) => Parser<[T, U]> = (pa, pb) => bind(pa, (a) => bind(pb, (b) => pure([a, b])));
再做看看後面會用到的 seq3
和 seq5
:
const seq3 : <T, U, V>( pa: Parser<T>, pb: Parser<U>, pc: Parser<V> ) => Parser<[T, U, V]> = (pa, pb, pc) => bind(pa, (a) => bind(pb, (b) => bind(pc, (c) => pure([a, b, c]))));
const seq5 : <T, U, V, W, X>( pa: Parser<T>, pb: Parser<U>, pc: Parser<V>, pd: Parser<W>, pe: Parser<X> ) => Parser<[T, U, V, W, X]> = (pa, pb, pc, pd, pe) => bind(seq3(pa, pb, pc), ([a, b, c]) => bind(seq(pd, pe), ([d, e]) => pure([a, b, c, d, e])));
有時候我們也想直接替換結果,所以還會準備一個給 parser 用的 map
:
const map : <T, U>(pa: Parser<T>, f: (x: T) => U) => Parser<U> = (pa, f) => (input) => { const [a, rest] = pa(input); if (a === undefined) return [, input]; return [f(a), rest]; };
現在我們可以一次串起好幾個 parser 了:
const fullname = seq3(word, str(" "), word); [result, rest] = fullname("Isaac Huang"); // ["Isaac", " ", "Huang"], "" console.log(result, rest);
many
& sepBy
我們還需要一種用來取「零個或一個東西」的 parser ,才好做出組合任意數量 parser 的 combinator :
const zeroOrOne : <T>(pa: Parser<T>) => Parser<[] | [T]> = (pa) => (input) => { const [a, rest] = pa(input); if (a === undefined) return [[], input]; return [[a], rest]; };
接著就能做出將一個 parser 重複很多次的 many
:
const many : <T>(pa: Parser<T>) => Parser<T[]> = (pa) => bind(zeroOrOne(pa), (xs) => xs.length === 0 ? pure(xs) : bind(many(pa), (ys) => pure([...xs, ...ys])));
還有以一個 parser 分隔另一個 parser 的 sepBy
const skip : <T, U>(pa: Parser<T>, pb: Parser<U>) => Parser<T> = (pa, pb) => bind(pa, (a) => bind(pb, (_) => pure(a))); const skipFirst : <T, U>(pa: Parser<T>, pb: Parser<U>) => Parser<U> = (pa, pb) => bind(pa, (_) => bind(pb, (b) => pure(b)));
const sepBy : <T, U>(pa: Parser<T>, pb: Parser<U>) => Parser<T[]> = (pa, pb) => bind(zeroOrOne(pa), (xs) => { const p = skipFirst(pa, pb); return xs.length === 0 ? pure(xs) : bind(many(p), (ys) => pure([...xs, ...ys]))); }
product
現在我們可以處理不定數量英文單字的產品名稱和開發商名稱了:
const concat = (xs: string[]) => xs.join(""); const spaces = map(many(str(" ")), concat); const productName = map(sepBy(word, spaces), (xs) => xs.join(" ")); const developerName = productName; [result, rest] = productName("Valheim, 318"); // "Valheim", ", 318" console.log(result, rest); [result, rest] = productName("Death Stranding, 1790"); // "Death Stranding", ", 1790" console.log(result, rest);
const price : Parser<number> = map(digits, (str) => parseInt(str, 10)); const comma = str(","); const product = map( seq5(productName, comma, price, comma, developerName), ([product, _, price, __, developer]) => ({ product, price, developer }), ); const newline = str("\n"); const productList = sepBy(product, newline);
const csv = `Death Stranding,1790,Kojima Productions Grand Theft Auto V,1299,Rockstart North Valheim,318,Iron Gate AB`; [result, rest] = productList(csv); console.table(result);
寫 OCaml 時,會把要暴露給外界的 interface 放在 *.mli
檔案中,所以我們從 angstrom.mli
開始,看看有沒有什麼資訊,可以讓我們找到 library 入口或 parser 相關 type 。
type +'a t (** A parser for values of type ['a]. *)
在 OCaml 程式裡,通常會將一個檔案當成一個 module ,那個 module 最重要的 type 並不會跟該 module 同名(例如在其他語言中,你可能會看到 Parser.Parser
這種 type ),而是叫做 t
。
在 TypeScript 中,可能會寫成:
declare type Parser<A>; // 這不是合法的 TypeScript
在這裡先忽略那個 covariant +'a
,我們接著找找 t
從哪裡來。
在 angstrom.ml
中沒有看到 t
,但是在 module Unbuffered
裡面看到了:
include Parser
這行很可疑,我們往 parser.ml
找看看。
module Parser
並沒有暴露 interface 給外界,所以沒有 parser.mli
。在 parser.ml
裡我們發現:
type 'a t = { run : 'r. ( 'r failure -> ('a, 'r) success -> 'r State.t ) with_state }
翻譯成白話文大概是:
type t
帶著 type 參數 'a
,是個有著名為 run
field 的 record , run
是一個有著 type 參數 'r
的 function ,這個 function 帶著 parser 相關的狀態 with_state
,並且在得到兩個參數 failure
和 success
後,能得出 parse 結果 'r State.t
。
用 TypeScript 寫則是:
interface Parser<A> { run: WithState<<R> (fail: Failure<R>) => (succ: Success<A ,R>) => State<R> >; }
with_state
我們先看看 with_state
是什麼,再來解釋一下整個 Input.t
的結構。
with_state
是這樣定義的:
type 'a with_state = Input.t -> int -> More.t -> 'a
可以看成在替 'a
這個 type ,都帶上了 Input.t
、 int
和 More.t
三個狀態。看後續的程式碼,我們可以知道, Input.t
代表的是輸入, int
代表的是現在輸入讀到什麼位置 (position) , More.t
代表的是還有沒有輸入。
More.t
則只用來表示兩種狀態:
type t = | Complete | Incomplete
如果用 TypeScript 表示,可能會把 with_state
寫成:
interface WithState<T> { input: Input; pos: number; isComplete: boolean; value: T; }
在 functional programming language 中,靠參數來傳遞狀態是很常見的手法。
Input.t
Angstrom 支援動態增長的 input buffer ,把輸入從字串抽象成 Input.t
,但今天我們的目的是為了瞭解怎麼用 OCaml 寫 parser combinator ,可以先不深究這個 Input.t
。
只需要知道它內部儲存的是一個在整個輸入上移動的窗口,為了效率,它不會存已經處理過的部分。如果因為輸入不足,會得到未完成的結果狀態 Partial
,可以再餵輸入繼續 parse 下去。
這次我們的範例會交給 parse_string
函數處理,它一開始就會把整個字串變成一個 Input.t
,沒有增長不增長的問題。
'r failure
failure
會拿到以 string list
表示的 marks 和錯誤訊息 string
message ,最後才會得到 parse 結果 State.t
。
又因為在 failure
中可能會用到其他 parser ,所以加上 with_state
,把 parser 的狀態傳給它。
type 'a failure = (string list -> string -> 'a State.t) with_state
實際將 marks 和 message 組合成 State.Fail
的 function 是 fail_k
。我們現在不用看它。
('a, 'r) success
success
會拿到代表結果的 'a
,最後得到 parse 結果 State.t
。
在 success
中一樣會用到其他 parser ,一樣加上 with_state
,把 parser 狀態傳給它。
type ('a, 'r) success = ('a -> 'r State.t) with_state
實際將結果 'a
組合成 State.Done
的 function 是 success_k
。我們現在也不用理它。
至此我們可以看出來,要以 Angstrom parser 組合出新的 parser ,我們會把目前的 input, pos 和 more 傳給目前 parser 的 run
function ,再靠目前的 failure
和 success
做出新的 failure
與 success
function 。
就好像我們在 TypeScript parser 中,把 input 交給 Parser<T>
當作第一個參數一樣:
type Parser<T> = (input: string) => [Option<T>, string];
return
& >>=
return
和 >>=
是 Monad interface 上的函數。
先不要管 Monad 是什麼,如果一切順利的話, KK 在他的《Haskell,不年輕卻前衛的優雅程式語言》中,應該提過 Monad 和 typeclass ,甚至是 parser combinator 了。
一直傳遞 input, position, more 這件事,就像我們在 TypeScript parser 的 seq
中要一個一個跑子 parser 一樣麻煩。
我們可以把 Monad interface 上的 return
(也就是 TypeScript 版本中的 pure
)實現出來。
module Monad = struct let return v = { run = fun input pos more _fail succ -> succ input pos more v } (* 略 *) end
OCaml 可以定義 infix operator 。我們再實現 bind
的 infix 版本 >>=
,方便串接數個 parser 。
module Monad = struct (* 略 *) let (>>=) p f = { run = fun input pos more fail succ -> (* succ' 會把 p 解析出來的結果交給 f , * 再用新的 parser (f v) 處理之後的 * input' pos' more' *) let succ' input' pos' more' v = (f v).run input' pos' more' fail succ in (* 先試試 parser p ,成功的話把結果交給 succ' *) p.run input pos more fail succ' } end
Angstrom 還實現了「丟掉兩個 parse 結果中,左邊的那個」的 *>
與「丟掉兩個 parse 結果中,右邊的那個」的 <*
。
一如 TypeScript 版本的 skipFirst
和 skip
。
let ( *>) a b = (* a >>= fun _ -> b *) { run = fun input pos more fail succ -> let succ' input' pos' more' _ = b.run input' pos' more' fail succ in a.run input pos more fail succ' }
let (<* ) a b = (* a >>= fun x -> b >>| fun _ -> x *) { run = fun input pos more fail succ -> let succ0 input0 pos0 more0 x = let succ1 input1 pos1 more1 _ = succ input1 pos1 more1 x in b.run input0 pos0 more0 fail succ1 in a.run input pos more fail succ0 }
Angstrom 提供了 <|>
operator ,讓我們寫下 p <|> q
來先試試 parser p
再試試 parser q
。 <|>
特別之處在於,如果 p
失敗了,它不會把已經看過的輸入吃掉,以便 q
可以重試。
為此,實作時要創造自己的 fail'
,重用外面傳來的 pos
,而不是跑完 p
後的 pos'
:
let (<|>) p q = { run = fun input pos more fail succ -> let fail' input' pos' more' marks msg = if pos < Input.parser_committed_bytes input' then fail input' pos' more marks msg else q.run input' pos more' fail succ in p.run input pos more fail' succ }
string
OCaml 版的 string
是這樣實作的:
let string_ f s = let len = String.length s in ensure len (unsafe_apply_opt len ~f:(fun buffer ~off ~len -> let i = ref 0 in while !i < len && Char.equal (f (Bigstringaf.unsafe_get buffer (off + !i))) (f (String.unsafe_get s !i)) do incr i done; if len = !i then Ok (Bigstringaf.substring buffer ~off ~len) else Error "string")) let string s = string_ (fun x -> x) s
f
是為了在不分大小寫時,用來轉換每個 char
用的,可以先不理他。 ensure len
是用來保證接下來要處理的輸入還超過 len
長度。 unsafe_apply_opt
則是把 result
轉成 parser 結果用的工具,在此也不用深究。
可以看出來 string
會一個 char 一個 char 比較來自輸入的字串是不是和我們想取得的字串 s
一樣。
take_while
let rec count_while ~init ~f ~with_buffer = { run = fun input pos more fail succ -> let len = Input.count_while input (pos + init) ~f in let input_len = Input.length input in let init' = init + len in (* Check if the loop terminated because * it reached the end of the input buffer. * If so, then prompt for additional input * and continue. *) if pos + init' < input_len || more = Complete then succ input (pos + init') more (Input.apply input pos init' ~f:with_buffer) else let succ' input' pos' more' = (count_while ~init:init' ~f ~with_buffer).run input' pos' more' fail succ and fail' input' pos' more' = succ input' (pos' + init') more' (Input.apply input' pos' init' ~f:with_buffer) in prompt input pos fail' succ' } let take_while f = count_while ~init:0 ~f ~with_buffer:Bigstringaf.substring
這裡的 prompt
會試著取得更多輸入,我們不用管它。
take_while
會先靠 Input.count_while
,得知這次的 f
能滿足多少字元,再將結果從 input 裡面切出來。
現在可以做出 OCaml 版的 word
parser :
let is_upper c = let code = Char.code c in Char.code 'A' <= code && code <= Char.code 'Z' let is_lower c = let code = Char.code c in Char.code 'a' <= code && code <= Char.code 'z' let word = take_while (fun c -> is_upper c || is_lower c)
Angstrom 提供了 lift*
系列函數,我們不用特別做自己的 seq*
便能把數個 parser 連起來。
lift
和 >>|
(也就是 map
)是一樣的:
let (>>|) p f = { run = fun input pos more fail succ -> let succ' input' pos' more' v = succ input' pos' more' (f v) in p.run input pos more fail succ' } let (<$>) f m = m >>| f let lift f m = f <$> m
跑完當前的 parser p
後,再把結果 v
變成 f v
後,交給下一棒 succ
。不改變 input, pos, more ,把它們繼續傳下去。
在 lift2
的實作中我們也可以看到傳遞 input, pos, more 的過程:
let lift2 f m1 m2 = { run = fun input pos more fail succ -> let succ1 input1 pos1 more1 m1 = let succ2 input2 pos2 more2 m2 = succ input2 pos2 more2 (f m1 m2) in m2.run input1 pos1 more1 fail succ2 in m1.run input pos more fail succ1 }
我們可以用 lift3
來做一個 fullname
parser :
let fullname = lift3 (fun first space last -> first ^ space ^ last) word (string " ") word
用 lift3
就像是一次把三個 parser 的結果拿出來。
如果用前面提過的 return
和 >>=
來做 fullname
parser ,則會像是一次拿出一個 parser 的結果:
let fullname2 = word >>= (fun first -> (string " ") >>= (fun space -> word >>= (fun last -> return (first ^ space ^ last))))
是不是讓人想起 Callback Hell
若同時配上 OCaml 4.08 之後提供的新語法 let+
( applicative composition ,可以看成給任意 type 用的 async/await ),寫起來更直覺:
let fullname3 = let+ first = word and+ space = string " " and+ last = word in first ^ space ^ last
many
& sep_by
Angstrom 在處理不定數量的 parser 時,不像我們在 TypeScript 的實作那樣,先跑看看當成輸入的 parser 看結果。而是提供了製作遞迴 parser 的工具 fix
:
let fix f = let rec p = lazy (f r) and r = { run = fun buf pos more fail succ -> (Lazy.force p).run buf pos more fail succ } in r
這裡的 lazy
會把後面的 expression 變成一種等一下用 Lazy.force
才會 evaluate 的東西,有點像我們自己做 eta expansion 把 x
變成 fun () -> x
。
於是 fix f
的 f
就會拿到整個 fix f
的結果。 Angstrom 用 fix
來實作 many
:
let many p = fix (fun m -> (lift2 cons p m) <|> return [])
在這裏, lift2 cons p m
會先試試 parser p
,如果失敗了會得到 []
。如果成功了則試試 m
,也就是下一層的遞迴 many p
。
sep_by
也是這樣實作,差別只在它會把 separator parser s
的結果丟掉:
let sep_by1 s p = fix (fun m -> lift2 cons p ((s *> m) <|> return [])) let sep_by s p = let q = (s *> sep_by1 s p) in (lift2 cons p (q <|> return [])) <|> return []
product
現在可以做完我們的 product parser 了:
let spaces = take_while1 (fun c -> c = ' ') let comma = string "," let product_name = sep_by spaces word >>| String.concat " " let price = digits >>| int_of_string let developer_name = product_name
let product = let open Product in let+ p = product_name and+ _ = comma and+ n = price and+ _ = comma and+ d = developer_name in { product=p; price=n; developer=d } let product_list = sep_by end_of_line product
>>|
讓我們把結果轉成我們想要的樣子, Product
只是定義 product record 的形狀, let+
和 and+
讓我們更直覺地使用 parsers 。
在簡單的 TypeScript parser combinators 上,可以看到 type 怎麼樣幫助我們包裝複雜的結構,讓它們成為可以解釋、重複利用的積木。
和 Angstrom 比較後,可以看到 OCaml 額外提供的功能怎樣改善開發者的體驗。
也告訴我們 functional language 並不可怕。只要你會一些 TypeScript ,就能試試看 OCaml (或是它的表兄弟 ReScript )。