Try   HackMD

Functional Programming 風格的 C 語言實作

contributed by < jserv, JEFF1216, brian208579, happyincent >

何謂 Functional Programming?

Functional Programming (以下簡稱 FP) 是種 programming paradigm (開發典範),不是 design pattern 也不是 framework,更不是 language。簡單來說,FP 是種以數學函數為中心的「思考方式」與「程式風格」。

Why Functional Programming Matters 是篇 1984 年的論文,在 1990 年做小幅度修改,主要凸顯出 FP 的優點與重要特質。

一般來說,FP 的幾個特點如下:

  • 運算用的 function 大多只包含條件式及遞迴呼叫;
  • higher-order functions 主要有兩個特點:
    • function 可當作參數傳入 function 之中
    • function 可以回傳 function
  • 沒有隱含 (implicit) 的 Side Effect (副作用)
    • 將程式語言的函式視作看作是數學上的函數,乍看似乎沒問題,但副作用的存在不能保證每次呼叫的時候,傳回來的東西都會一樣,也就是程式本身,並非獨立於程式語言其他的部份。所以當程式出錯的時候,開發者需要走訪所有觸及的程式碼,方可確立問題的來源;
    • Haskell 等純函數語言,副作用是必須「明確」地用型別標示出來,而不是隱含的。副作用本身不必然是 primitive 的操作,可以是由語言的其他構件組成。而如 OCaml 這樣的 FP 語言,甚至提供 (explicit) reference 這樣的型別。所以每次呼叫函數,總可以保證傳回的數值不會變,也因此要寫對程式,也變得比較容易,或者說,較難寫程式但可確保容易寫出對的程式;
  • lazy evaluation
    • 當變數被定義時,不會立即 evaluate,而是等待要使用時才 evaluate 並儲存已計算的結果,所以不會重複 evaluate 同一個定義
    • 因為沒有副作用,所以可以把計算過的值記起來,以便下次再呼叫的時候直接拿來用,也就是 lazy evaluation 的精神。

延伸閱讀: Functional Programming: Concept & Example

程式語言引入函式傳遞是縮減複雜度的手法之一,然而,很多時候很難阻止開發者傳遞幾百或幾千行的函式內容。於是,程式語言的設計者則轉向焦點,思索如何規範開發者撰寫出意圖單一、實作單純的簡明函式,這即是不可變(immutable) 特性。當開發者無法任意更動變數內含值或物件狀態,便只能從既有變數值或物件狀態,來產生新的數值或物件狀態,這不啻貫徹數學上的數學函數嗎?換言之,開發者會將大任務切割為若干小任務,於是可察覺小任務的流程重複性,一旦開發者封裝重複的小任務,隨後即可在呼叫它們時,只專注在真正該關心的運算,並以函式傳遞運算。

若函式實作時貫徹不可變特性,該函式就會被稱為純函式 (pure function),相對地,就會是具有副作用的函式。值得留意的是,即便在 Haskell 一類純函數式語言中,也是有副作用的部份,不然就無法接受外界輸入,也不能進行程式對外的輸出。純函數式程式設計規範的是,副作用函式與純函式有個明確的界線,一邊是完全純粹的世界,不純的世界是另一邊;副作用本身對電腦上執行的程式是必要的特徵,它並非萬惡,真正令開發者畏懼的是,開發者搞不清楚這函式是否有副作用,進一步導致他們在不知道的地方改變什麼。

近年 FP 被重新提出且廣泛應用,是基於以下考量:

  1. 硬體單核處理器時脈出現瓶頸,中央處理器的運作時脈已經很難超過 5GHz,因此單執行緒程式速度也面臨瓶頸,但隨著半導體製程進步,單位面積可以塞入更多電晶體,因此處理器可納入更多核 (即 multi-core)。典型 imperative 寫法依賴 side effect,使得多核很難發揮效益,但 FP 沒有 side effect,很容易藉由平行運算,充分運用硬體能力;
  2. side effect 寫法很難單元測試,因為不保證每次的結果都ㄧ樣,但 FP 的 pure function 很適合單元測試
  3. OOP 的 object composition 需依賴 interface,容易造成 interface 混亂與 over design,但 FP 的 function compostion 不需要額外 interface,只要求你使用 Dataflow 思考,並使用 pure function 避免 side effect 即可

用 C 語言實作 FP 風格

在 FP 中,以下三種常見操作,搭配下圖食物對應:

  • map(): 配合給定行為,回傳一個陣列或鏈結串列
  • filter(): 用以檢索和篩選
  • reduce(): 將素材混合在一塊,得到最終結果

考慮 test-functional.c 的用法:

void print_int(void *v, void *args) {
    printf("[ op:%s ] %d\n", (char *) args, (int) (*(int *) v));
}
void print_char(void *v, void *args) {
    static int count = 0;
    printf("%c ", (char) (*(char *) v));
    if (++count > 10) {
        printf("\n");
        count = 0;
    }
}

bool isOdd(void *v, void *args) {
    return (bool) (*((int *) v) & 1);
}
void *twice(void *v, void *args) {
    int *o = malloc(sizeof(int));
    *o = *((int *) v) * 2;
    return o;
}

int main(int argc, char **argv) {
    gc_init();  // initialize the garbage collector

    iter(map(range(0, 10), twice, NULL), print_int,
         "twice"); /* 0, 2, 4, .., 20 */
    iter(filter(range(0, 10), isOdd, NULL), print_int,
         "odd");                                  /* 1, 3, 5, 7, 9 */
    iter(rangeChar('a', 'z'), print_char, NULL);  /* a, b, c, ..., z */
    iter(rangeChar(' ', 'z'), print_char, NULL);  // won't get printed
    iter(rangeChar('1', 'z'), print_char, NULL);

    gc_collect();

    exit(0);
}

執行輸出為:

[ op:twice ] 0
[ op:twice ] 2
[ op:twice ] 4
[ op:twice ] 6
[ op:twice ] 8
[ op:twice ] 10
[ op:twice ] 12
[ op:twice ] 14
[ op:twice ] 16
[ op:twice ] 18
[ op:twice ] 20
[ op:odd ] 1
[ op:odd ] 3
[ op:odd ] 5
[ op:odd ] 7
[ op:odd ] 9
a b c d e f g h i j k 
l m n o p q r s t u v 
w x y z 1 2 3 4 5 6 7 
8 9 : ; < = > ? @ A B 
C D E F G H I J K L M 
N O P Q R S T U V W X 
Y Z [ \ ] ^ _ ` a b c 
d e f g h i j k l m n 
o p q r s t u v w x y 
z

第 25 行的 map 操作,將給定 [0, 10] 區間的整數都透過 twice 函式乘上 2,再透過 print_int 個別輸出產生的數值。

第 27 行的 filter 操作,依據 isOdd 函式的判斷,自 [0, 10] 區間篩選出奇數, 再透過 print_int 個別輸出篩選出來的數值。

第 29 行到 31 行則展示 map 搭配 filter,列舉出給定 ASCII 字元集的範圍,並透過 print_char 函式輸出。

參見其他實作:

案例: 反轉 linked list 元素

以下展示採用 Functional Programming 風格開發的 singly-linked list 程式: reverse.c

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>

/* integer linked list type */
typedef struct __int_list const *const int_list_t;
static int_list_t const Nil = NULL; /* empty list */

/* singly linked list element */
typedef struct __int_list {
    int_list_t next;
    int32_t const val;
} node_t;

/* A pointer to where we are about to store the result of computation */
typedef void *const CPS_Result;

/* prototypes for the continuations */
typedef void (*MakeListCallback)(int_list_t list, CPS_Result res);
void make_list(uint32_t const arr_size,
               int32_t const array[],
               int_list_t lst,
               MakeListCallback const cb,
               CPS_Result res);

typedef void (*ReversedListCallback)(int_list_t rlist, CPS_Result res);
void reverse(int_list_t list,
             int_list_t rlist,
             ReversedListCallback const cb,
             CPS_Result res);

typedef void (*VoidMappable)(int32_t const val);
void void_map_array(VoidMappable const cb,
                    uint32_t const size,
                    int32_t const *const arr);

void list2array(int_list_t list, CPS_Result res);

/* reverse a list and then store it in an array */
void reverse_toarray(int_list_t list, CPS_Result res) {
    reverse(list, Nil, list2array, res);
}

static void print_val(int32_t const val) { printf("%d ", val); }

#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof(arr[0]))

int main(int argc, char *argv[]) {   
    int32_t arr[] = {99, 95, 90, 85, 80, 20, 75, 15, 10, 5};
    uint32_t const arr_size = ARRAY_SIZE(arr);

    int32_t res[arr_size];
    /* call make_list and pass reverse_toarray as the "continuation" */
    make_list(arr_size, arr, Nil, reverse_toarray, res);

    /* print out the reversed array */
    void_map_array(print_val, arr_size, res);
    printf("\n");
    return 0;  
}

/* construct a linked list from an array */
void make_list(uint32_t const arr_size,
               int32_t const arr[],
               int_list_t lst,
               MakeListCallback const cb,
               CPS_Result res) {
    if (!arr_size) {
        cb(lst, res);
        return;
    }
    make_list(arr_size - 1, arr,
        &(node_t){.val = arr[arr_size - 1], .next = lst}, cb, res);
}

/* transform a linked list into an array */
void list2array(int_list_t list, CPS_Result res) {
    if (Nil == list) return;
    int32_t *array = res;
    array[0] = list->val;
    list2array(list->next, array + 1);
}

void reverse(int_list_t list,
             int_list_t rlist, /* reversed list */
             ReversedListCallback const cb,
             CPS_Result res) {
    if (Nil == list) {
        cb(rlist, res);
        return;
    }
    reverse(list->next, &(node_t){.val = list->val, .next = rlist},
            cb, res);
}

/* iterate over an array and performs action cb on each element */
void void_map_array(VoidMappable const cb,
                    uint32_t const size,
                    int32_t const *const arr) {
    if (!size) return;
    cb(arr[0]);
    void_map_array(cb, size - 1, arr + 1);
}

精髓在於以下:

/* integer linked list type */
typedef struct __int_list const *const int_list_t;
static int_list_t const Nil = NULL; /* empty list */

這是個 const 結構指標且該指標的記憶體地址和內含值無法透過區域變數去改變,正如上述所及的 FP 精神。

再來,是關於 callback 的函式指標的理解,這是個函式指標,而 MakeListCallback 就是拿來接收 reverse_toarray 的這個函式,並且將 lstres 傳給 reverse 做下一個動作,打個比方:make_list(arr_size, arr, Nil, reverse_toarray, res); 會在這行程式碼去呼叫 make_list 副函式直到執行完透過 cb(lst,res) 將做好的 linked list 開頭的記憶體地址回傳給 reverse,所以整體的函式執行順序是:

main -->
    make_list -->
      reverse_toarray -->
        reverse -->
            list2array -->
                void_map_array

再來是建立 linked list 的部分:
這個是從陣列中的尾端建立起,亦即 5 是尾巴而 99 是頭,而在建立 linked list 過程中,lst 會將下一個結構的 next 儲存起來,並且將記憶體地址和數值複寫進下一個結構中,所以在 make_list 的順序是:

  • lst






foo



a

99

 



b

95

 



a:c->b:ref






c

90

 



b:c->c:ref






d

...



c:c->d:ref






e

5

 



d->e:ref






  • list






foo



a

99

 



b

95

 



a:c->b:data






c

...



b:c->c:data






d

5

 



c:c->d:data





e

NULL



d:c->e






reverse 將開頭 (99) 的 next 指向一開始的 rlist (Null),再透過 rlist 儲存新的 (99) 的記憶體地址,再依序將 新的 (95) 的 next 指向新的 (99),如圖:

  • rlist:






foo



a

99

 



b

NULL



a:c->b:data












foo



a

95

 



b

99

 



a:c->b:data






c

NULL



b:c->c:data






直到將全部的 linked list 做完反轉後才結束,如圖:

  • rlist






foo



a

5

 



b

10

 



a:c->b:data






c

...



b:c->c:data






d

99

 



c:c->d:data





e

NULL



d:c->e






最後的 linked list 再傳回 list2array 並輸出最終結果。本程式的技巧除了透過陣列來建立一個 linked list,過程中也利用 if 的判斷式來進行遞迴並且傳給下一個副函式,例如在 reverse 副函式中,其終止的條件是,當 list 的記憶體地址為 NULL (也就是到最後一個 5 時) 會停止遞迴並將其中的值傳給下一個副函式:

void reverse(int_list_t list,
             int_list_t rlist, /* reversed list */
             ReversedListCallback const cb,
             CPS_Result res) {
    if (Nil == list) {
        cb(rlist, res);
        return;
    }
    reverse(list->next, &(node_t) K1, cb, res);
}

注意到這裡的 CPS:

Continuation Passing Style (CPS): Programming style characterized by functions with an additional continuation argument, which call their continuation instead of returning to their caller.

案例: 對 Linked List 元素進行合併排序

利用 merge sort 我們減少了程式碼中的 side effect 並且將每個副函式幾乎變成了 pure function。也就是說,每個函式我們都只用了區域變數,並讓其中的值只受到該區域的副函式影響,直到結束後傳遞結果為止。

  • merge sort
    merge sort 簡單來說就是將一個 linked list 拆成一個又一個獨立的結構再去作排列和重組。如圖:

mergesort.c

我們運用兩指標 fast 和 slow,當 fast 不為 NULL 時,fast 往前讀取記憶體地址,若下個 fast 依舊不為 NULL,則 slow 和 fast 兩個指標繼續往前,直到 fast 指向 NULL 為止。如下圖:

  • list






foo



a

99

 



b

95

 



a:c->b:ref






g

slow



a:c->g:data






c

85

 



b:c->c:ref






h

fast



b:c->h:data






d

80

 



c:c->d:ref






e

. ..

 



d:c->e:ref






f

NULL



e:c->f






若 fast 不為 NULL 則變成:







foo



a

99

 



b

95

 



a:c->b:ref






g

slow



a:c->g:data






c

85

 



b:c->c:ref






d

80

 



c:c->d:ref






h

fast



c:c->h:data






e

. ..

 



d:c->e:ref






f

NULL



e:c->f






若 fast 仍不為 NULL 的話則 fast 和 slow 各前進一格。







foo



a

99

 



b

95

 



a:c->b:ref






c

85

 



b:c->c:ref






g

slow



b:c->g:data






d

80

 



c:c->d:ref






e

. ..

 



d:c->e:ref






h

fast



d:c->h:data






f

NULL



e:c->f






直到 fast 碰到 NULL 為止,則該 list 就會被分為兩個和其他,以「案例: 反轉 linked list 元素」的輸入來說,就是 (99, 90, 95, 6, 85, 20, 75, 15) 一群,(10, 5) 為另一群這樣分下去。直到變成一個獨立的 linked list 後再去連接起來。

本次的程式執行順序為:

main > make_list > mergesort > partition > partition > > mergesort > mergelist > mergesort > list2array > void_map_array

採用 non-intrusive 的 linked list 來實現,從程式碼中

typedef struct __list * list_t;
typedef struct __list {
    list_t next;
} node_t;

typedef struct __ele {
    int32_t const val;
    list_t const list;
} ele_t; 

我們將 vallist 的值宣告為 const,亦即無法對其值進行改變,然後將next 宣告為一個指標的指標,圖示如下:







foo



a

val

list



g

next



a:c->g:data






b

val

list



g:c->b:ref






  • Nil

而在程式實現中,我們必須設計一個 Nil 結構參數,其目的是為了在各個函式之間呼叫,我們是以ele_t 或是list_t 的結構型態進行回傳資料和遞迴,所以我們無法直接利用 NULL 來作為函式中斷遞迴的判斷,因此我們令Nil static const ele_t Nil = {.val = 0, .list = &(node_t){ .next = NULL }}; 意思是當我們指向 Nil 時候就表示該記憶體地址指向 Null。如圖:







foo



a

val

list



g

next



a:c->g:data






b

Nil(0)

list



c

NULL



b:c->c:data






g:c->b:ref






  • container_of (ptr, type, member)

container_of 這樣的巨集用於 Linux 核心和一些專案中,作用是根據給定的結構體變數其中的一個成員變數的指標,來獲取指向整個結構體變數的指標。舉例來說,給定以下結構體:

struct demo_struct {
    type1 member1;
    type2 member2;
    type3 member3;
    type4 member4;
};      

假設結構體變數 demo 在執行時期的記憶體位置呈現如下圖:

     demo
 +-------------+ 0xA000
 |   member1   |
 +-------------+ 0xA004
 |   member2   |
 +-------------+ 0xA010
 |   member3   |
 +-------------+ 0xA018
 |   member4   |
 +-------------+

一旦執行程式碼 container_of(memp, struct demo_struct, type3),會使取得的記憶體地址為該結構的初始地址,亦即 0xA000,而在作 merge sort 時,前述 fast 和 slow 指標必須是指向該 linked list 結構的初始記憶體地址,才可以對它作 dereference ,否則就會出現記憶體操作的錯誤,因次我們可定義list_entry 如下:

#define list_entry(node, type, member) \
container_of(node, type, member)

例如在 partition 中,我們要對將其拆開時,便會使用到:

fast = list_entry(head->list->next, ele_t, list);
  • mergeLists

最後是關於 mergeLists 的實作,類似之前的程式碼,只是在於進行遞迴呼叫時,要用到 tmp 來當作暫存器儲存結構的記憶體地址。

if (a->list->next == NULL) { return b; } else if (b->list->next == NULL) { return a; } if (a->val <= b->val) { mergedList = a; ele_t *tmp = mergeLists(list_entry(a->list->next, ele_t, list), b); mergedList->list->next = (list_t)(&(tmp->list)); } else { mergedList = b; ele_t *tmp = mergeLists(a, list_entry(b->list->next, ele_t, list)); mergedList->list->next = (list_t)(&(tmp->list)); }

其中第 13 行將回傳 (a, Nil)mergeLists

我們假設 a 是上述示意圖 95 的結構體,b 是 90 的結構體,在比較完大小後,在 tmp = mergeLists(a, list_entry(b->list->next, ele_t, list)); 中會進行遞迴呼叫,隨即將 a 結構體指標和 Nil 回傳給 mergeLists 函式,經由 if 判斷式後回傳 a 結構體指標給 tmp 最後將 90 的 next ,也就是 b->list->next 由原本的指向 Nil 改成指向 95 的結構體,以此遞迴下去完成 non-intrusive linked-list 的重組。最後再輸出該 linked list,完成此次 FP 風格的實作。

gdb下的觀察結果:

breakpoint 4, partition (head=0x7ffffffedee0, front=0x7ffffffedde0, back=0x7ffffffedde8) at mergesort.c:124
124         if (head == NULL || head->list->next == (list_t)(&(Nil.list))) {
(gdb)
129             slow = head;
(gdb)
130             fast = list_entry(head->list->next, ele_t, list);
(gdb)
132             while(fast->list->next != NULL) {
(gdb) p *head
$1 = {val = 99, list = 0x7ffffffeded8}
(gdb) p *fast
$2 = {val = 95, list = 0x7ffffffedf48}
(gdb) p *(head->list->next)
$3 = {next = 0x7ffffffedf48}
(gdb) p *(head->list)
$4 = {next = 0x7ffffffedf58}
(gdb) p &(head->list>next)
No symbol "next" in current context.
(gdb) p &(head->list->next)
$5 = (list_t *) 0x7ffffffeded8
(gdb) p (head->list)
$6 = (const list_t) 0x7ffffffeded8
(gdb) p (head->list->next)
$7 = (list_t) 0x7ffffffedf58
(gdb) p &(fast->list)
$8 = (const list_t *) 0x7ffffffedf58