目的: 檢驗學員對 bitops 及記憶體配置的認知
作答表單: 測驗 1
作答表單: 測驗 2
作答表單: 測驗 3
1
以下程式碼建構在 C 語言基本型態之上,打造更高精密度 (precision) 的整數運算,定義如下:
#include <inttypes.h>
#include <stdint.h>
/* mpi: Multi-Precision Integers */
typedef struct {
uint32_t *data;
size_t capacity;
} mpi_t[1];
typedef size_t mp_bitcnt_t;
其記憶體管理相關的操作如下:
void mpi_init(mpi_t rop)
{
rop->capacity = 0;
rop->data = NULL;
}
void mpi_clear(mpi_t rop)
{
free(rop->data);
}
void mpi_enlarge(mpi_t rop, size_t capacity)
{
if (capacity > rop->capacity) {
size_t min = rop->capacity;
rop->capacity = capacity;
rop->data = realloc(rop->data, capacity * 4);
if (!rop->data && capacity != 0) {
fprintf(stderr, "Out of memory (%zu words requested)\n", capacity);
abort();
}
for (size_t n = min; n < capacity; ++n)
rop->data[n] = 0;
}
}
void mpi_compact(mpi_t rop)
{
size_t capacity = rop->capacity;
if (rop->capacity == 0)
return;
for (size_t i = rop->capacity - 1; i != (size_t) -1; --i) {
if (rop->data[i])
break;
capacity--;
}
assert(capacity != (size_t) -1);
rop->data = realloc(rop->data, capacity * 4);
rop->capacity = capacity;
if (!rop->data && capacity != 0) {
fprintf(stderr, "Out of memory (%zu words requested)\n", capacity);
abort();
}
}
接著準備基本操作:
/* ceiling division without needing floating-point operations. */
static size_t ceil_div(size_t n, size_t d)
{
return AAAA;
}
#define INTMAX BBBB
void mpi_set_u64(mpi_t rop, uint64_t op)
{
size_t capacity = ceil_div(64, 31);
mpi_enlarge(rop, capacity);
for (size_t n = 0; n < capacity; ++n) {
rop->data[n] = op & INTMAX;
op >>= 31;
}
for (size_t n = capacity; n < rop->capacity; ++n)
rop->data[n] = 0;
}
基本的乘法運算:
static void mpi_mul_naive(mpi_t rop, const mpi_t op1, const mpi_t op2)
{
size_t capacity = op1->capacity + op2->capacity;
mpi_t tmp;
mpi_init(tmp);
mpi_enlarge(tmp, capacity);
for (size_t n = 0; n < tmp->capacity; ++n)
tmp->data[n] = 0;
for (size_t n = 0; n < op1->capacity; ++n) {
for (size_t m = 0; m < op2->capacity; ++m) {
uint64_t r = (uint64_t) op1->data[n] * op2->data[m];
uint64_t c = 0;
for (size_t k = CCCC; c || r; ++k) {
if (k >= tmp->capacity)
mpi_enlarge(tmp, tmp->capacity + 1);
tmp->data[k] += (r & INTMAX) + c;
r >>= 31;
c = tmp->data[k] >> 31;
tmp->data[k] &= INTMAX;
}
}
}
mpi_set(rop, tmp);
mpi_compact(rop);
mpi_clear(tmp);
}
除法和餘數運算:
void mpi_fdiv_qr(mpi_t q, mpi_t r, const mpi_t n, const mpi_t d)
{
mpi_t n0, d0;
mpi_init(n0);
mpi_init(d0);
mpi_set(n0, n);
mpi_set(d0, d);
if (mpi_cmp_u32(d0, 0) == 0) {
fprintf(stderr, "Division by zero\n");
abort();
}
mpi_set_u32(q, 0);
mpi_set_u32(r, 0);
size_t start = mpi_sizeinbase(n0, 2) - 1;
for (size_t i = start; i != (size_t) -1; --i) {
mpi_mul_2exp(r, r, DDDD);
if (mpi_testbit(n0, i) != 0)
mpi_setbit(r, 0);
if (mpi_cmp(r, d0) >= 0) {
mpi_sub(r, r, d0);
mpi_setbit(q, i);
}
}
mpi_clear(n0);
mpi_clear(d0);
}
最大公因數運算:
void mpi_gcd(mpi_t rop, const mpi_t op1, const mpi_t op2)
{
if (mpi_cmp_u32(op2, 0) == 0) {
mpi_set(rop, op1);
return;
}
mpi_t q, r;
mpi_init(q);
mpi_init(r);
mpi_fdiv_qr(q, r, op1, op2);
EEEE;
mpi_clear(q);
mpi_clear(r);
}
測試程式碼:
printf("mpi_fdiv_qr\n");
{
mpi_t n, d, q, r;
mpi_init(n);
mpi_init(d);
mpi_init(q);
mpi_init(r);
mpi_set_str(n, "549755813889", 10);
mpi_set_str(d, "1234", 10);
mpi_fdiv_qr(q, r, n, d);
assert(mpi_cmp_u32(q, 445507142) == 0);
assert(mpi_cmp_u32(r, 661) == 0);
mpi_clear(n);
mpi_clear(d);
mpi_clear(q);
mpi_clear(r);
}
printf("GCD test\n");
{
mpi_t a, b, r;
mpi_init(a);
mpi_init(b);
mpi_init(r);
mpi_set_str(a, "2310", 10);
mpi_set_str(b, "46189", 10);
mpi_gcd(r, a, b);
assert(mpi_cmp_u32(r, 11) == 0);
mpi_clear(a);
mpi_clear(b);
mpi_clear(r);
}
假設記憶體配置都會成功,已知不會遇到任何 abort
或 assert
錯誤,依據〈最大公因數特性和實作考量〉及〈Linux 核心原始程式碼的整數除法〉,補完程式碼並滿足以下規範:
;
:
或 ?
字元ceil_div
的第一個參數為 n
,第二個參數為 d
,若要表示二者的加法操作,書寫為 n+d
,而非 d+n
U
, UL
或 ULL
延伸問題:
2
SIMD within a register (SWAR) 是軟體最佳化技巧之一,以下展示 SWAR 運用於 64 位元微處理器架構,原本判斷 2 個 32 位元寬度的整數是否都是奇數 (odd),可能會這樣撰寫:
#include <stdint.h>
bool both_odd(uint32_t x, uint32_t y)
{
return (x & 1) && (y & 1);
}
但我們可先組合 (compound) 2 個 32 位元寬度的整數為 1 個 64 位元整數,再運用特製的 bitmask,從而減少運算量:
static uint64_t SWAR_ODD_MASK = (1L << 32) + 1;
bool both_odd_swar(uint64_t xy) {
return (xy & SWAR_ODD_MASK) == SWAR_ODD_MASK;
}
測試程式:
static inline uint64_t bit_compound(uint32_t x, uint32_t y)
{
return ((0L + x) << 32) | ((y + 0L) & (-1L >> 32));
}
int main()
{
int x = 12345678;
int y = 9012345;
uint64_t xy = bit_compound(x, y);
printf("%d, %d\n", both_odd(x, y), both_odd_swar(xy));
}
延伸閱讀: SIMD and SWAR Techniques
在 Linux 核心原始程式碼中,lib/string.c 具備 memchr 的實作:
/**
* memchr - Find a character in an area of memory.
* @s: The memory area
* @c: The byte to search for
* @n: The size of the area.
*
* returns the address of the first occurrence of @c, or %NULL
* if @c is not found
*/
void *memchr(const void *s, int c, size_t n)
{
const unsigned char *p = s;
while (n-- != 0) {
if ((unsigned char)c == *p++) {
return (void *)(p - 1);
}
}
return NULL;
}
這段程式會逐字元去檢查輸入的字串,直至與 c
相符。C99 規格書 6.5.2.4 提到:
The result of the postfix ++ operator is the value of the operand. After the result is obtained, the value of the operand is incremented. (That is, the value 1 of the appropriate type is added to it.) See the discussions of additive operators and compound assignment for information on constraints, types, and conversions and the effects of operations on pointers.
測試程式:
int main()
{
const char str[] = "https://wiki.csie.ncku.edu.tw";
const char ch = '.';
char *ret = memchr_opt(str, ch, strlen(str));
printf("String after |%c| is - |%s|\n", ch, ret);
return 0;
}
預期執行結果:
String after |.| is - |.csie.ncku.edu.tw|
利用上述 SIMD within a register (SWAR) 的技巧,我們可改寫為以下 memchr_opt
函式:
#include <limits.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
/* Nonzero if X is not aligned on a "long" boundary */
#define UNALIGNED(X) ((long) X & (sizeof(long) - 1))
/* How many bytes are loaded each iteration of the word copy loop */
#define LBLOCKSIZE (sizeof(long))
/* Threshhold for punting to the bytewise iterator */
#define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE)
#if LONG_MAX == 2147483647L
#define DETECT_NULL(X) (((X) - (0x01010101)) & ~(X) & (0x80808080))
#else
#if LONG_MAX == 9223372036854775807L
/* Nonzero if X (a long int) contains a NULL byte. */
#define DETECT_NULL(X) \
(((X) - (0x0101010101010101)) & ~(X) & (0x8080808080808080))
#else
#error long int is not a 32bit or 64bit type.
#endif
#endif
/* @return nonzero if (long)X contains the byte used to fill MASK. */
#define DETECT_CHAR(X, mask) DETECT_NULL(AAAA)
void *memchr_opt(const void *str, int c, size_t len)
{
const unsigned char *src = (const unsigned char *) str;
unsigned char d = c;
while (UNALIGNED(src)) {
if (!len--)
return NULL;
if (*src == d)
return (void *) src;
src++;
}
if (!TOO_SMALL(len)) {
/* If we get this far, len is large and src is word-aligned. */
/* The fast code reads the source one word at a time and only performs
* a bytewise search on word-sized segments if they contain the search
* character. This is detected by XORing the word-sized segment with a
* word-sized block of the search character, and then checking for the
* presence of NULL in the result.
*/
unsigned long *asrc = (unsigned long *) src;
unsigned long mask = d << 8 | d;
mask |= mask << 16;
for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask |= BBBB;
while (len >= LBLOCKSIZE) {
if (DETECT_CHAR(CCCC, mask))
break;
asrc++;
len -= LBLOCKSIZE;
}
/* If there are fewer than LBLOCKSIZE characters left, then we resort to
* the bytewise loop.
*/
src = (unsigned char *) asrc;
}
while (len--) {
if (*src == d)
return (void *) src;
src++;
}
return NULL;
}
留意巨集 DETECT_NULL
: 首先會先將輸入字串跟 mask
做 XOR 運算,若輸入字串中有任何字元與要搜尋的字元相符,進行 XOR 後就會出現 00000000
(全為 0 的位元組)。接著可看到這個巨集:
#if LONG_MAX == 2147483647L
#define DETECT_NULL(X) (((X) - (0x01010101)) & ~(X) & (0x80808080))
#else
#if LONG_MAX == 9223372036854775807L
/* 當 X(型態為 long)包含 NULL byte 時會是非 0 */
#define DETECT_NULL(X) \
(((X) - (0x0101010101010101)) & ~(X) & (0x8080808080808080))
這段巨集會依據 long
型態是否為 32 位元或 64 位元,做出相對應的位元運算。
(X) - (0x01010101)
:
若以每個位元組(byte)來看,當
X
的任一個位元組為0
,那麼和0x01010101
相減後,對應的位元組就會變成0xFF
。
和上一步結果再跟 ~X
做 AND 運算:
當
X
的某個位元組為0
,於是~X
中對應的位元組也會是0xFF
。
舉例來說,若 A
和B
都是 32 位元的 long,其中 A = 0x00FF00FF
,B = 0x12345678
。 特意把 A
的第一與第三個位元組設為 0
:
A - 0x01010101 = 0xFFFEFFFE
0xFFFEFFFE & ~A = 0xFF00FF00
接著看 B
(它沒有任何位元組為 0):
B - 0x01010101 = 0x11335577
0x11335577 & ~B = 0x01030107
最後這些結果再和 0x80808080
做 bitwise AND 運算。可把 1 個位元組分成左右各 4 個 bits 看,若原先確實有包含 0 byte,那它在最高的一組 4 個 bits 會有 1
。示意圖如下:
程式先透過 XOR 檢查目標字元是否存在(其中一個位元組為 0),如果存在,它就會觸發對應的條件判斷並立即 break
(表示目標字元已找到)。
請補完程式碼,使上述 memchr_opt
的實作符合 memchr 行為,作答規範:
延伸問題:
memchr_opt
,設計實驗來觀察隨著字串長度和特定 pattern 變化的效能影響下一步:貢獻程式碼到 Linux 核心
Phoronix 報導
代表行程接收到 signal 後會產生什麼行為,並且每個行程有各自的 signal disposition ,基本的預設行為有下列幾種
行程可以透過 sigaction(2) 改變 signal disposition ,有以下幾種行為可以選擇
如果利用 signal handler 來處理該 signal ,預設是將該 signal handler 指向的函式的函式資訊框 (function frame,簡稱 fp) 加在原本行程的堆疊當中,也可用 signalstack(2) 使該函式資訊框使用其他的堆疊。
Linux 核心在每次轉移回使用者模式時,都會檢查是否有正在等待的 signal 並檢查是否有對應的 signal handler,如果有的話會進行以下動作
sigprocmask()
來建立在 act->sa_mask
當中的 signal 都會被加到該執行緒的 signal mask 當中。sigreturn()
,恢復執行緒的狀態回到呼叫 signal handler 之前,然後 Linux 核心把控制權轉回給使用者模式,繼續執行 signal handler 被呼叫之前的指令。如果 signal handler 因為 siglongjmp()
或使用 execve()
啟動新的程式而沒有回傳,則不會回傳 signal trampoline 也不會呼叫 sigreturn()
,此時這是開發者的責任要去恢復 signal mask 的狀態。
以 Linux 核心的角度來看,它不知道 signal handler 跟其他使用者層級的程式碼之間的差別,自然也不知道目前執行的是誰,所有 signal handler 需要的相關資訊都儲存在使用者層級的暫存器和堆疊中,所以巢狀呼叫的 signal handler 數量上限是被使用者層級的堆疊大小所限制。
sigaction() 系統呼叫可以改變行程接收到特定 signal 時的行為。
signum
是用來指定該 signal 的數字(不能指定 SIGKILL
和 SIGSTOP
)。
struct sigaction
定義如下
struct sigaction {
void (*sa_handler)(int);
void (*sa_sigaction)(int, siginfo_t *, void *);
sigset_t sa_mask;
int sa_flags;
void (*sa_restorer)(void);
};
特別注意到 sa_restorer
並非是給使用者使用的,詳見 sigreturn(2) 。
如上節所說的, sa_handler
是行程接收到 signum
這個 signal 時會採取的行為
SIG_DFL
代表採取預設行為SIG_IGN
代表忽略該 signalsa_flags
中設定 SA_SIGINFO
,則 sa_sigaction
就指向處理 signum
的 signal-hadling functionsignal handler 的函式定義如下
void handler(int sig, siginfo_t *info, void *ucontext);
以下解說參數
sig
: 觸發此 handler function 的 signal number (和 signum
相同)info
: 指向 siginfo_t
的指標,該結構體包含 signal 的詳細資訊。詳見 sigaction(2) 。ucontext
: a pointer to a ucontext_t
structure, cast to void *
。此指標指向的結構體包含了 Linux 核心 存在 userspace stack 中的 signal context information 。通常此參數不會被使用到,詳見 getcontext(3) 。sa_mask
則代表在 signal handler 執行期間會被阻塞著的 signals ,並且觸發 signal handler 的 signal 本身也會被阻塞。
sa_flags
代表改變該 signal 行為的標籤集合,有數種標籤詳見 sigaction(2)
在 泛 Sytem V 環境當中,我們可以透過以下幾種結構體和函式來控制行程當中不同執行緒的 user-level context switch 。
mcontext_t
: machine-dependent 且不開放給使用者操作。ucontext_t
: 定義如下typedef struct ucontext_t {
struct ucontext_t *uc_link;
sigset_t uc_sigmask;
stack_t uc_stack;
mcontext_t uc_mcontext;
...
} ucontext_t;
uc_link
: 指向在目前 context 被終止後會接續執行的 context 。uc_sigmask
: 被此 context 阻塞的 signals 集合。uc_stack
: 此 context 使用的堆疊空間。uc_mcontext
: 被儲存的 context 的 machine-specific 表示法,包含 calling thread's machine registers 。getcontext()
會將 ucp
指向的結構體初始化並指向目前執行的 context 。
setcontext()
會繼續執行 ucp
指向的 user context ,此 context 有三種方式獲取
getcontext()
makecontext()
最早實作 user-level context switch 此機制的方式是透過 setjmp()
/longjmp()
,不過這樣的方式沒有定義如何處理 signal context ,之後 sigsetjmp()
/siglongjmp()
讓我們可以處理 signal context 。當 signal 觸發某個 signal handler 時,目前的 user context 會被暫存而 Linux 核心 會為 signal handler 建立一個新的 user context 。我們不該使用 longjmp()
函式來離開 signal handler ,這樣的行為是未定義的,取而代之應該使用 siglongjmp()
或 setcontext()
。
注意到除非我們自己設置額外儲存裝置進行記錄,不然我們無從得知 getcontext()
的回傳行為是從首次呼叫 getcontext()
還是透過 setcontext()
來回傳。因為 register 會被重新恢復,所以利用 register variable 來記錄是無效的。
3
以下程式碼實作並行程式設計: 排程器原理提及的使用者層級的執行緒 (user-level thread; ULT,也可稱為 green thread),原理是透過 SIGRTMIN signal 來模擬作業系統的 timer 中斷,設定的時間間隔為 10 ms,於是每 10 ms 觸發模擬的 timer 中斷,帶動 ULT 排程器,實作 round-robin 排程策略。留意到 trampoline 的使用:
Trampolines (sometimes referred to as indirect jump vectors) are memory locations holding addresses pointing to interrupt service routines, I/O routines, etc. Execution jumps into the trampoline and then immediately jumps out, or bounces, hence the term trampoline.
留意 getcontext 及 makecontext 的使用。
採 round-robin 排程。
標頭檔: (檔名: coro.h
)
#pragma once
#include <ucontext.h>
/* Function type for user-level thread entry points.
* @args is a pointer to user-specified arguments.
*/
typedef void (*coro_entry)(void *args);
/* Create a new user-level thread that is marked as runnable.
* The thread's entry point is @entry with the given @args.
* The thread will not start executing until the next call to coro_yield().
* Returns a unique thread identifier on success, or -1 on failure.
*/
int coro_create(coro_entry entry, void *args);
/* Yield execution to the next available user-level thread.
* The scheduler may choose to run a different thread or resume the current one.
*/
void coro_yield(void);
/* Terminate the currently running user-level thread.
* This call does not return.
*/
void coro_exit(void);
/* Terminate the user-level thread identified by @id.
* The target thread is forcefully stopped and cleaned up.
*/
void coro_destroy(int id);
/* Retrieve the identifier of the currently running user-level thread.
* Returns the unique identifier of the calling thread.
*/
int coro_getid(void);
/* Receive a value sent from another user-level thread.
* On return, the object pointed by @who is set to the identifier of the sender.
* Returns the value sent.
*/
int coro_recv(int *who);
/* Send an integer value @val to the user-level thread identified by @dest.
*/
void coro_send(int dest, int val);
使用範例: (檔名 prime.c
)
#include <stdio.h>
#include <stdlib.h>
#include "coro.h"
static void primeproc(void *_ __attribute__((unused)))
{
/* Receive a prime number from the left neighbor */
int p = coro_recv(NULL);
printf("%d ", p);
/* Create a new user-level thread for the right neighbor to continue the
* chain.
*/
int id = coro_create(primeproc, NULL);
if (id == -1)
exit(0);
/* Continuously filter out numbers divisible by the current prime,
* sending only those that are not divisible to the right neighbor.
*/
while (1) {
int i = coro_recv(NULL);
if (i % p)
coro_send(id, i);
}
}
void coro_main(void *_ __attribute__((unused)))
{
/* Create the first prime processing user-level thread in the chain */
int id = coro_create(primeproc, NULL);
/* Continuously send all integers starting from 2 into the chain */
for (int i = 2;; i++)
coro_send(id, i);
}
執行輸出:
2 3 5 ... 8089 8093 8101
使用範例: (檔名 yield.c
)
#include <stdio.h>
#include "coro.h"
static void yield_thread(void *_ __attribute__((unused)))
{
printf("Current environment %08x.\n", coro_getid());
for (int i = 0; i < 5; i++) {
coro_yield();
printf("Back in environment %08x, iteration %d.\n", coro_getid(), i);
}
printf("All done in environment %08x.\n", coro_getid());
}
void coro_main(void *_ __attribute__((unused)))
{
for (int i = 0; i < 3; i++)
coro_create(yield_thread, NULL);
coro_exit();
}
參考執行輸出:
Current environment 00000001.
Current environment 00000002.
Current environment 00000003.
Back in environment 00000001, iteration 0.
Back in environment 00000002, iteration 0.
Back in environment 00000003, iteration 0.
...
All done in environment 00000001.
Back in environment 00000002, iteration 4.
All done in environment 00000002.
Back in environment 00000003, iteration 4.
All done in environment 00000003.
coro.c
原始程式碼 (部分遮蔽)
作答規範:
ENV_
開頭coro_
開頭參考資訊: